From 5109998a798fd285f37c5f1cc43e81bff7e50137 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 10:53:00 +0900 Subject: [PATCH 01/20] Update gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0270631..69e8d8f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ .settings/ target/ README.md.bak - +*.versionsBackup From 580d30d4a81924429641b0671c764056767ad48c Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 11:51:10 +0900 Subject: [PATCH 02/20] [ISSUE-12] first pass at adding broker override properties constructors --- CHANGELOG.md | 4 + .../kafka/test/KafkaTestServer.java | 131 +++++++++++++----- .../kafka/test/KafkaTestServerTest.java | 44 +++++- 3 files changed, 145 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d168217..8348d04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## 2.2.1 (5/16/2018) +- [Issue-12](https://github.com/salesforce/kafka-junit/issues/12) Added ability to pass broker properties to be used by test kafka service instance. +- Added helper method getAdminClient() on KafkaTestServer. + ## 2.2.0 (4/24/2018) - [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)! - [Issue-4](https://github.com/salesforce/kafka-junit/issues/4) Fix server configuration to allow for transactional producers & consumers. diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index 2c7dec8..71d7f20 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -25,6 +25,7 @@ package com.salesforce.kafka.test; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.io.Files; import kafka.server.KafkaConfig; @@ -76,6 +77,11 @@ public class KafkaTestServer implements AutoCloseable { */ private final String localHostname; + /** + * Defines overridden broker properties. + */ + private final Properties overrideBrokerProperties; + /** * Default constructor using "127.0.0.1" as the advertised host. */ @@ -89,6 +95,26 @@ public KafkaTestServer() { */ public KafkaTestServer(final String localHostname) { this.localHostname = localHostname; + this.overrideBrokerProperties = new Properties(); + } + + /** + * Alternative constructor allowing override of advertised host and brokerProperties. + * @param overrideBrokerProperties Define Kafka broker properties. + */ + public KafkaTestServer(final Properties overrideBrokerProperties) { + this.localHostname = "127.0.0.1"; + this.overrideBrokerProperties = overrideBrokerProperties; + } + + /** + * Alternative constructor allowing override of advertised host and brokerProperties. + * @param localHostname What IP or hostname to advertise services on. + * @param overrideBrokerProperties Define Kafka broker properties. + */ + public KafkaTestServer(final String localHostname, final Properties overrideBrokerProperties) { + this.localHostname = localHostname; + this.overrideBrokerProperties = overrideBrokerProperties; } /** @@ -129,46 +155,55 @@ public void start() throws Exception { zkServer = new TestingServer(zkInstanceSpec, true); final String zkConnectionString = getZookeeperServer().getConnectString(); - // Create temp path to store logs - final File logDir = Files.createTempDir(); - logDir.deleteOnExit(); + // Build properties using a baseline from overrideBrokerProperties. + final Properties brokerProperties = new Properties(overrideBrokerProperties); + + // Put required zookeeper connection properties. + setPropertyIfNotSet(brokerProperties, "zookeeper.connect", zkConnectionString); + + // Conditionally generate a port for kafka to use if not already defined. + kafkaPort = (String) setPropertyIfNotSet(brokerProperties, "port", String.valueOf(InstanceSpec.getRandomPort())); - // Determine what port to run kafka on - kafkaPort = String.valueOf(InstanceSpec.getRandomPort()); + // If log.dir is not set. + if (brokerProperties.getProperty("log.dir") == null) { + // Create temp path to store logs + final File logDir = Files.createTempDir(); + logDir.deleteOnExit(); - // Build properties - Properties kafkaProperties = new Properties(); - kafkaProperties.setProperty("zookeeper.connect", zkConnectionString); - kafkaProperties.setProperty("port", kafkaPort); - kafkaProperties.setProperty("log.dir", logDir.getAbsolutePath()); - kafkaProperties.setProperty("auto.create.topics.enable", "true"); - kafkaProperties.setProperty("zookeeper.session.timeout.ms", "30000"); - kafkaProperties.setProperty("broker.id", "1"); - kafkaProperties.setProperty("auto.offset.reset", "latest"); + // Set property. + brokerProperties.setProperty("log.dir", logDir.getAbsolutePath()); + } // Ensure that we're advertising appropriately - kafkaProperties.setProperty("host.name", localHostname); - kafkaProperties.setProperty("advertised.host.name", localHostname); - kafkaProperties.setProperty("advertised.port", kafkaPort); - kafkaProperties.setProperty("advertised.listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); - kafkaProperties.setProperty("listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); + setPropertyIfNotSet(brokerProperties, "host.name", localHostname); + setPropertyIfNotSet(brokerProperties, "advertised.host.name", localHostname); + setPropertyIfNotSet(brokerProperties, "advertised.port", kafkaPort); + setPropertyIfNotSet(brokerProperties, "advertised.listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); + setPropertyIfNotSet(brokerProperties, "listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); + + // Set other defaults if not defined. + setPropertyIfNotSet(brokerProperties, "auto.create.topics.enable", "true"); + setPropertyIfNotSet(brokerProperties, "zookeeper.session.timeout.ms", "30000"); + setPropertyIfNotSet(brokerProperties, "broker.id", "1"); + setPropertyIfNotSet(brokerProperties, "auto.offset.reset", "latest"); // Lower active threads. - kafkaProperties.setProperty("num.io.threads", "2"); - kafkaProperties.setProperty("num.network.threads", "2"); - kafkaProperties.setProperty("log.flush.interval.messages", "1"); + setPropertyIfNotSet(brokerProperties, "num.io.threads", "2"); + setPropertyIfNotSet(brokerProperties, "num.network.threads", "2"); + setPropertyIfNotSet(brokerProperties, "log.flush.interval.messages", "1"); // Define replication factor for internal topics to 1 - kafkaProperties.setProperty("offsets.topic.replication.factor", "1"); - kafkaProperties.setProperty("offset.storage.replication.factor", "1"); - kafkaProperties.setProperty("transaction.state.log.replication.factor", "1"); - kafkaProperties.setProperty("transaction.state.log.min.isr", "1"); - kafkaProperties.setProperty("transaction.state.log.num.partitions", "4"); - kafkaProperties.setProperty("config.storage.replication.factor", "1"); - kafkaProperties.setProperty("status.storage.replication.factor", "1"); - kafkaProperties.setProperty("default.replication.factor", "1"); - - final KafkaConfig config = new KafkaConfig(kafkaProperties); + setPropertyIfNotSet(brokerProperties, "offsets.topic.replication.factor", "1"); + setPropertyIfNotSet(brokerProperties, "offset.storage.replication.factor", "1"); + setPropertyIfNotSet(brokerProperties, "transaction.state.log.replication.factor", "1"); + setPropertyIfNotSet(brokerProperties, "transaction.state.log.min.isr", "1"); + setPropertyIfNotSet(brokerProperties, "transaction.state.log.num.partitions", "4"); + setPropertyIfNotSet(brokerProperties, "config.storage.replication.factor", "1"); + setPropertyIfNotSet(brokerProperties, "status.storage.replication.factor", "1"); + setPropertyIfNotSet(brokerProperties, "default.replication.factor", "1"); + + // Create and start kafka service. + final KafkaConfig config = new KafkaConfig(brokerProperties); kafka = new KafkaServerStartable(config); getKafkaServer().startup(); } @@ -191,7 +226,7 @@ public void createTopic(final String topicName, final int partitions) { final short replicationFactor = 1; // Create admin client - try (final AdminClient adminClient = KafkaAdminClient.create(buildDefaultClientConfig())) { + try (final AdminClient adminClient = getAdminClient()) { try { // Define topic final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); @@ -219,6 +254,14 @@ public void shutdown() throws Exception { close(); } + /** + * Creates a Kafka AdminClient connected to our test server. + * @return Kafka AdminClient instance. + */ + public AdminClient getAdminClient() { + return KafkaAdminClient.create(buildDefaultClientConfig()); + } + /** * Creates a kafka producer that is connected to our test server. * @param Type of message key @@ -324,6 +367,28 @@ private Map buildDefaultClientConfig() { return defaultClientConfig; } + /** + * Helper method to conditionally set a property if no value is already set. + * @param properties The properties instance to update. + * @param key The key to set if not already set. + * @param defaultValue The value to set if no value is already set for key. + * @return The value set. + */ + private Object setPropertyIfNotSet(final Properties properties, final String key, final String defaultValue) { + // Validate inputs + Preconditions.checkNotNull(properties); + Preconditions.checkNotNull(key); + + // Conditionally set the property if its not already set. + properties.setProperty( + key, + properties.getProperty(key, defaultValue) + ); + + // Return the value that is being used. + return properties.get(key); + } + /** * Closes the internal servers. Failing to call this at the end of your tests will likely * result in leaking instances. diff --git a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java index c344d77..fe67f95 100644 --- a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java +++ b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java @@ -25,24 +25,29 @@ package com.salesforce.kafka.test; +import com.google.common.collect.Iterables; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Collection; import java.util.Collections; import java.util.Properties; /** * Validation tests against KafkaTestServer class. */ -public class KafkaTestServerTest { +class KafkaTestServerTest { /** * Integration test validates that we can use transactional consumers and producers against the Test kafka instance. */ @@ -100,4 +105,41 @@ void testExactlyOnceTransaction() throws Exception { } } } + + /** + * Integration test validates that we can override broker properties. + */ + @Test + void testOverrideBrokerProperties() throws Exception { + final String expectedBrokerId = "22"; + + // Define our override property + final Properties overrideProperties = new Properties(); + overrideProperties.put("broker.id", expectedBrokerId); + + + // Create our test server instance passing override properties. + try (final KafkaTestServer kafkaTestServer = new KafkaTestServer(overrideProperties)) { + // Start service + kafkaTestServer.start(); + + // Create an AdminClient + try (final AdminClient adminClient = kafkaTestServer.getAdminClient()) { + // Describe details about the cluster + final DescribeClusterResult result = adminClient.describeCluster(); + + // Get details about the nodes + final Collection nodes = result.nodes().get(); + + // Sanity test + Assertions.assertEquals(1, nodes.size(), "Should only have a single node"); + + // Get details about our test broker/node + final Node node = Iterables.get(nodes, 0); + + // Validate + Assertions.assertEquals(expectedBrokerId, node.idString(), "Has expected overridden broker Id"); + } + } + } } \ No newline at end of file From cb3d799be084b642af79f2720de7f00528a93632 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 11:54:51 +0900 Subject: [PATCH 03/20] [ISSUE-12] simplify constructors, copy property values --- .../com/salesforce/kafka/test/KafkaTestServer.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index 71d7f20..3e9a68c 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -80,13 +80,13 @@ public class KafkaTestServer implements AutoCloseable { /** * Defines overridden broker properties. */ - private final Properties overrideBrokerProperties; + private final Properties overrideBrokerProperties = new Properties(); /** * Default constructor using "127.0.0.1" as the advertised host. */ public KafkaTestServer() { - this("127.0.0.1"); + this("127.0.0.1", new Properties()); } /** @@ -94,8 +94,7 @@ public KafkaTestServer() { * @param localHostname What IP or hostname to advertise services on. */ public KafkaTestServer(final String localHostname) { - this.localHostname = localHostname; - this.overrideBrokerProperties = new Properties(); + this(localHostname, new Properties()); } /** @@ -103,8 +102,7 @@ public KafkaTestServer(final String localHostname) { * @param overrideBrokerProperties Define Kafka broker properties. */ public KafkaTestServer(final Properties overrideBrokerProperties) { - this.localHostname = "127.0.0.1"; - this.overrideBrokerProperties = overrideBrokerProperties; + this("127.0.0.1", overrideBrokerProperties); } /** @@ -114,7 +112,9 @@ public KafkaTestServer(final Properties overrideBrokerProperties) { */ public KafkaTestServer(final String localHostname, final Properties overrideBrokerProperties) { this.localHostname = localHostname; - this.overrideBrokerProperties = overrideBrokerProperties; + + // Add passed in properties. + this.overrideBrokerProperties.putAll(overrideBrokerProperties); } /** From 2302938948941851b760c5227827835e9cb95e7c Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 11:58:33 +0900 Subject: [PATCH 04/20] [ISSUE-12] update test to validate properties are not mutable after the fact --- .../java/com/salesforce/kafka/test/KafkaTestServerTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java index fe67f95..0dd624e 100644 --- a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java +++ b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java @@ -120,6 +120,10 @@ void testOverrideBrokerProperties() throws Exception { // Create our test server instance passing override properties. try (final KafkaTestServer kafkaTestServer = new KafkaTestServer(overrideProperties)) { + // Lets try to be sneaky and change our local property after calling the constructor. + // This shouldn't have any effect on the properties already passed into the constructor. + overrideProperties.put("broker.id", "1000"); + // Start service kafkaTestServer.start(); From fe8626aa875a6abf4d66636de5ab66aee6f00766 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 12:27:43 +0900 Subject: [PATCH 05/20] [ISSUE-12] update Kafka-JUnit4 to support overriding broker properties --- .../test/junit4/SharedKafkaTestResource.java | 53 ++++++++++++++++++- ....java => SharedKafkaTestResourceTest.java} | 37 +++++++++++-- 2 files changed, 85 insertions(+), 5 deletions(-) rename kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/{KafkaTestServerTest.java => SharedKafkaTestResourceTest.java} (83%) diff --git a/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java index 5c3dfa0..c07a4e3 100644 --- a/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java +++ b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; + /** * Creates and stands up an internal test kafka server to be shared across test cases within the same test class. * @@ -56,6 +58,55 @@ public class SharedKafkaTestResource extends ExternalResource { */ private KafkaTestUtils kafkaTestUtils = null; + /** + * Additional broker properties. + */ + private final Properties brokerProperties = new Properties(); + + /** + * Default constructor. + */ + public SharedKafkaTestResource() { + this(new Properties()); + } + + /** + * Constructor allowing passing additional broker properties. + * @param brokerProperties properties for Kafka broker. + */ + public SharedKafkaTestResource(final Properties brokerProperties) { + this.brokerProperties.putAll(brokerProperties); + } + + /** + * Helper to allow overriding Kafka broker properties. Can only be called prior to the service + * being started. + * @param name Kafka broker configuration property name. + * @param value Value to set for the configuration property. + * @return SharedKafkaTestResource instance for method chaining. + * @throws IllegalArgumentException if name argument is null. + * @throws IllegalStateException if method called after service has started. + */ + public SharedKafkaTestResource withBrokerProperty(final String name, final String value) { + // Validate input. + if (name == null) { + throw new IllegalArgumentException("Cannot pass null name argument"); + } + + // Validate state. + if (kafkaTestServer != null) { + throw new IllegalStateException("Cannot add properties after service has started"); + } + + // Add or set property. + if (value == null) { + brokerProperties.remove(name); + } else { + brokerProperties.put(name, value); + } + return this; + } + /** * Here we stand up an internal test kafka and zookeeper service. * Once for all tests that use this shared resource. @@ -66,7 +117,7 @@ protected void before() throws Exception { throw new IllegalStateException("Unknown State! Kafka Test Server already exists!"); } // Setup kafka test server - kafkaTestServer = new KafkaTestServer(); + kafkaTestServer = new KafkaTestServer(brokerProperties); kafkaTestServer.start(); } diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java similarity index 83% rename from kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java rename to kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java index 817b535..7db3d19 100644 --- a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java @@ -25,14 +25,17 @@ package com.salesforce.kafka.test.junit4; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.salesforce.kafka.test.KafkaTestServer; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; @@ -44,28 +47,35 @@ import org.slf4j.LoggerFactory; import java.time.Clock; +import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** - * Test of KafkaTestServer. + * Test of SharedKafkaTestResource. * * This also serves as an example of how to use this library! */ -public class KafkaTestServerTest { - private static final Logger logger = LoggerFactory.getLogger(KafkaTestServerTest.class); +public class SharedKafkaTestResourceTest { + private static final Logger logger = LoggerFactory.getLogger(SharedKafkaTestResourceTest.class); /** * We have a single embedded kafka server that gets started when this test class is initialized. * * It's automatically started before any methods are run via the @ClassRule annotation. * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. + * + * This example we override the Kafka broker id to '12', but this serves as an example of how you + * how you could override any Kafka broker property. */ @ClassRule - public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + .withBrokerProperty("broker.id", "12"); /** * Before every test, we generate a random topic name and create it within the embedded kafka server. @@ -156,6 +166,25 @@ public void testCreatingTopicMultipleTimes() { assertTrue("Made it here!", true); } + /** + * Validate that broker Id was overridden correctly. + */ + @Test + public void testBrokerIdOverride() throws ExecutionException, InterruptedException { + try (final AdminClient adminClient = getKafkaTestServer().getAdminClient()) { + final Collection nodes = adminClient.describeCluster().nodes().get(); + + assertNotNull("Sanity test, should not be null", nodes); + assertEquals("Should have 1 entry", 1, nodes.size()); + + // Get details about our test broker/node + final Node node = Iterables.get(nodes, 0); + + // Validate + assertEquals("Has expected overridden broker Id", 12, node.id()); + } + } + /** * Simple accessor. */ From f532528041c287cebd12268b6cd99718bb4336dc Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 12:34:59 +0900 Subject: [PATCH 06/20] [ISSUE-12] Update README for Kafka-JUnit4 --- kafka-junit4/README.md | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md index 667a68b..aa25c77 100644 --- a/kafka-junit4/README.md +++ b/kafka-junit4/README.md @@ -105,7 +105,7 @@ Include this library in your project's POM with test scope. **You'll also need #### KafkaTestServer -A great example of how to use this can be found within our tests! Check out [KafkaTestServerTest.java](src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java) +A great example of how to use this can be found within our tests! Check out [SharedKafkaTestResourceTest.java](src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java). Add the following to your JUnit test file and it will handle automatically starting and stopping the embedded Kafka instance for you. @@ -121,6 +121,20 @@ instance for you. public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); ``` +SharedKafkaTestResource exposes the ability to override properties set on the test kafka broker instance. + +```java + /** + * This is an example of how to override configuration values for the test kafka broker instance. + * + * Here we define the broker.id to be set to 100, and disable topic auto-creation. + */ + @ClassRule + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + .withBrokerProperty("broker.id", "1000") + .withBrokerProperty("auto.create.topics.enable", "false"); +``` + SharedKafkaTestResource has two accessors that you can make use of in your tests to interact with the service. ```java @@ -138,9 +152,9 @@ SharedKafkaTestResource has two accessors that you can make use of in your tests #### KafkaTestUtils Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal -kafka server. We've tried to collect some of these within [KafkaTestUtils](src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! +kafka server. We've tried to collect some of these within [KafkaTestUtils](../kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! -For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). +For usage and examples, check out it's test at [KafkaTestUtilsTest](../kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). #### Zookeeper Test Server From 38065e775bc1812a57e362515602e0f31c7e827d Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 12:38:14 +0900 Subject: [PATCH 07/20] [ISSUE-12] Update README for Kafka-JUnit4 --- kafka-junit4/README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md index aa25c77..73939b0 100644 --- a/kafka-junit4/README.md +++ b/kafka-junit4/README.md @@ -154,7 +154,7 @@ SharedKafkaTestResource has two accessors that you can make use of in your tests Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal kafka server. We've tried to collect some of these within [KafkaTestUtils](../kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! -For usage and examples, check out it's test at [KafkaTestUtilsTest](../kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). +For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). #### Zookeeper Test Server @@ -162,8 +162,7 @@ For usage and examples, check out it's test at [KafkaTestUtilsTest](../kafka-jun both of these together within the same Test class. If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following - to your JUnit test file -and it will handle automatically start and stopping the embedded Zookeeper instance for you. + to your JUnit test class and it will handle automatically start and stopping the embedded Zookeeper instance for you. ```java /** From cd909e107e6df177da6ce9f23d117d6d565517b8 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 12:45:14 +0900 Subject: [PATCH 08/20] [ISSUE-12] code cleanup --- .../java/com/salesforce/kafka/test/KafkaTestServerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java index 0dd624e..5095853 100644 --- a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java +++ b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java @@ -116,8 +116,7 @@ void testOverrideBrokerProperties() throws Exception { // Define our override property final Properties overrideProperties = new Properties(); overrideProperties.put("broker.id", expectedBrokerId); - - + // Create our test server instance passing override properties. try (final KafkaTestServer kafkaTestServer = new KafkaTestServer(overrideProperties)) { // Lets try to be sneaky and change our local property after calling the constructor. From beb432ee5e4aa16e198ca62921166d03d5895824 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 14:35:22 +0900 Subject: [PATCH 09/20] [ISSUE-12] Refactor Kafka-JUnit5 to use @RegisterExtension, add ability to set broker properties --- .../junit4/SharedKafkaTestResourceTest.java | 6 +- .../test/junit5/KafkaResourceExtension.java | 20 +-- .../test/junit5/SharedKafkaTestResource.java | 106 ++++++++++- .../junit5/SharedZookeeperTestResource.java | 64 ++++++- .../junit5/ZookeeperResourceExtension.java | 37 +--- ...t.java => KafkaResourceExtensionTest.java} | 11 +- .../kafka/test/junit5/KafkaTestUtilsTest.java | 16 +- .../junit5/SharedKafkaTestResourceTest.java | 169 ++++++++++++++++++ .../SharedZookeeperTestResourceTest.java | 48 +++++ ...va => ZookeeperResourseExtensionTest.java} | 9 +- 10 files changed, 406 insertions(+), 80 deletions(-) rename kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/{KafkaTestServerTest.java => KafkaResourceExtensionTest.java} (95%) create mode 100644 kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java create mode 100644 kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java rename kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/{ZookeeperTestServerTest.java => ZookeeperResourseExtensionTest.java} (92%) diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java index 7db3d19..9da73a5 100644 --- a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java @@ -70,12 +70,12 @@ public class SharedKafkaTestResourceTest { * It's automatically started before any methods are run via the @ClassRule annotation. * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. * - * This example we override the Kafka broker id to '12', but this serves as an example of how you + * This example we override the Kafka broker id to '1000', but this serves as an example of how you * how you could override any Kafka broker property. */ @ClassRule public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() - .withBrokerProperty("broker.id", "12"); + .withBrokerProperty("broker.id", "1000"); /** * Before every test, we generate a random topic name and create it within the embedded kafka server. @@ -181,7 +181,7 @@ public void testBrokerIdOverride() throws ExecutionException, InterruptedExcepti final Node node = Iterables.get(nodes, 0); // Validate - assertEquals("Has expected overridden broker Id", 12, node.id()); + assertEquals("Has expected overridden broker Id", 1000, node.id()); } } diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java index 08d5ebd..5c5e535 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; /** + * @deprecated This class is superseded by SharedKafkaTestResource. + * * JUnit 5 extension to provide an internal test kafka server to be shared across test cases within the same test class. * * Annotate your test class with: @@ -50,6 +52,7 @@ * Within your test case methods: * this.sharedKafkaTestResource.getKafkaTestServer()... */ +@Deprecated public class KafkaResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { private static final Logger logger = LoggerFactory.getLogger(KafkaResourceExtension.class); @@ -64,12 +67,8 @@ public class KafkaResourceExtension implements BeforeAllCallback, AfterAllCallba */ @Override public void beforeAll(ExtensionContext context) throws Exception { - logger.info("Starting kafka test server"); - // Start kafka test server - kafkaTestResource - .getKafkaTestServer() - .start(); + kafkaTestResource.beforeAll(context); } /** @@ -77,16 +76,7 @@ public void beforeAll(ExtensionContext context) throws Exception { */ @Override public void afterAll(ExtensionContext context) throws Exception { - logger.info("Shutting down kafka test server"); - - // Close out kafka test server if needed - try { - kafkaTestResource - .getKafkaTestServer() - .shutdown(); - } catch (final Exception e) { - throw new RuntimeException(e); - } + kafkaTestResource.afterAll(context); kafkaTestResource = null; } diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java index 10e42da..37ebd4b 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java @@ -28,21 +28,58 @@ import com.salesforce.kafka.test.KafkaTestServer; import com.salesforce.kafka.test.KafkaTestUtils; import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; /** - * Shared Kafka Test Resource instance. Contains references to internal Kafka and Zookeeper server instances. + * Creates and stands up an internal test kafka server to be shared across test cases within the same test class. + * + * Example within your Test class. + * + * @RegisterExtension + * public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); + * + * Within your test case method: + * sharedKafkaTestResource.getKafkaTestServer()... */ -public class SharedKafkaTestResource { +public class SharedKafkaTestResource implements BeforeAllCallback, AfterAllCallback { + private static final Logger logger = LoggerFactory.getLogger(SharedKafkaTestResource.class); + /** * Our internal Kafka Test Server instance. */ - private final KafkaTestServer kafkaTestServer = new KafkaTestServer(); + private KafkaTestServer kafkaTestServer = null; /** * Cached instance of KafkaTestUtils. */ private KafkaTestUtils kafkaTestUtils = null; + /** + * Additional broker properties. + */ + private final Properties brokerProperties = new Properties(); + + /** + * Default constructor. + */ + public SharedKafkaTestResource() { + this(new Properties()); + } + + /** + * Constructor allowing passing additional broker properties. + * @param brokerProperties properties for Kafka broker. + */ + public SharedKafkaTestResource(final Properties brokerProperties) { + this.brokerProperties.putAll(brokerProperties); + } + /** * @return Shared Kafka Test server instance. */ @@ -80,4 +117,67 @@ public String getZookeeperConnectString() { public String getKafkaConnectString() { return getKafkaTestServer().getKafkaConnectString(); } + + /** + * Helper to allow overriding Kafka broker properties. Can only be called prior to the service + * being started. + * @param name Kafka broker configuration property name. + * @param value Value to set for the configuration property. + * @return SharedKafkaTestResource instance for method chaining. + * @throws IllegalArgumentException if name argument is null. + * @throws IllegalStateException if method called after service has started. + */ + public SharedKafkaTestResource withBrokerProperty(final String name, final String value) { + // Validate input. + if (name == null) { + throw new IllegalArgumentException("Cannot pass null name argument"); + } + + // Validate state. + if (kafkaTestServer != null) { + throw new IllegalStateException("Cannot add properties after service has started"); + } + + // Add or set property. + if (value == null) { + brokerProperties.remove(name); + } else { + brokerProperties.put(name, value); + } + return this; + } + + /** + * Here we stand up an internal test kafka and zookeeper service. + * Once for all tests that use this shared resource. + */ + @Override + public void beforeAll(ExtensionContext context) throws Exception { + logger.info("Starting kafka test server"); + if (kafkaTestServer != null) { + throw new IllegalStateException("Unknown State! Kafka Test Server already exists!"); + } + // Setup kafka test server + kafkaTestServer = new KafkaTestServer(brokerProperties); + kafkaTestServer.start(); + } + + /** + * Here we shut down the internal test kafka and zookeeper services. + */ + @Override + public void afterAll(ExtensionContext context) throws Exception { + logger.info("Shutting down kafka test server"); + + // Close out kafka test server if needed + if (kafkaTestServer == null) { + return; + } + try { + kafkaTestServer.shutdown(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + kafkaTestServer = null; + } } diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java index 7469197..681d2e1 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java @@ -27,11 +27,20 @@ import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; /** * Shared Zookeeper Test Resource instance. Contains references to internal Zookeeper server instances. */ -public class SharedZookeeperTestResource { +public class SharedZookeeperTestResource implements BeforeAllCallback, AfterAllCallback { + private static final Logger logger = LoggerFactory.getLogger(SharedZookeeperTestResource.class); + /** * Our internal Zookeeper test server instance. */ @@ -39,16 +48,11 @@ public class SharedZookeeperTestResource { /** * @return Shared Zookeeper test server instance. + * @throws IllegalStateException if beforeAll() has not been called yet. */ public TestingServer getZookeeperTestServer() { if (zookeeperTestServer == null) { - // Setup zookeeper test server - final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); - try { - zookeeperTestServer = new TestingServer(zkInstanceSpec, true); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } + throw new IllegalStateException("Unable to get test server instance before being created!"); } return zookeeperTestServer; } @@ -59,4 +63,48 @@ public TestingServer getZookeeperTestServer() { public String getZookeeperConnectString() { return getZookeeperTestServer().getConnectString(); } + + /** + * Here we shut down the internal test zookeeper service. + */ + @Override + public void afterAll(ExtensionContext context) throws Exception { + logger.info("Shutting down zookeeper test server"); + + // If we don't have an instance + if (zookeeperTestServer == null) { + // Nothing to close. + return; + } + + try { + zookeeperTestServer.stop(); + zookeeperTestServer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // null out reference + zookeeperTestServer = null; + } + + /** + * Here we stand up an internal test zookeeper service. + * Once for all tests that use this shared resource. + */ + @Override + public void beforeAll(ExtensionContext context) throws Exception { + logger.info("Starting Zookeeper test server"); + if (zookeeperTestServer != null) { + throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); + } + + // Setup zookeeper test server + final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); + try { + zookeeperTestServer = new TestingServer(zkInstanceSpec, true); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } } diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java index 0b18697..ae110fe 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java @@ -25,19 +25,16 @@ package com.salesforce.kafka.test.junit5; -import org.apache.curator.test.TestingServer; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; import org.junit.jupiter.api.extension.ParameterResolutionException; import org.junit.jupiter.api.extension.ParameterResolver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; /** + * @deprecated This class is superseded by SharedZookeeperTestResource. + * * JUnit 5 extension to provide an internal test zookeeper server to be shared across test cases within the same test class. * * Annotate your test class with: @@ -54,34 +51,16 @@ * this.sharedZookeeperTestResource.getZookeeperTestServer()... * this.sharedZookeeperTestResource.getZookeeperConnectString()... */ +@Deprecated public class ZookeeperResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { - private static final Logger logger = LoggerFactory.getLogger(ZookeeperResourceExtension.class); - - private SharedZookeeperTestResource zookeeperTestResource = null; + private final SharedZookeeperTestResource zookeeperTestResource = new SharedZookeeperTestResource(); /** * Here we shut down the internal test zookeeper service. */ @Override public void afterAll(ExtensionContext context) throws Exception { - logger.info("Shutting down zookeeper test server"); - - // If we don't have an instance - if (zookeeperTestResource == null) { - // Nothing to close. - return; - } - - try { - final TestingServer testingServer = zookeeperTestResource.getZookeeperTestServer(); - testingServer.stop(); - testingServer.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - // null out reference - zookeeperTestResource = null; + zookeeperTestResource.afterAll(context); } /** @@ -90,12 +69,8 @@ public void afterAll(ExtensionContext context) throws Exception { */ @Override public void beforeAll(ExtensionContext context) throws Exception { - logger.info("Starting Zookeeper test server"); - if (zookeeperTestResource != null) { - throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); - } // Setup zookeeper test server - zookeeperTestResource = new SharedZookeeperTestResource(); + zookeeperTestResource.beforeAll(context); } @Override diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java similarity index 95% rename from kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java rename to kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java index 45cbc67..a8aaf6d 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java @@ -51,13 +51,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Test of KafkaTestServer. + * @deprecated Please use SharedKafkaTestResourceTest as a reference for using this library. * - * This also serves as an example of how to use this library! + * This serves as an example of how to use this library using @ExtendWith annotation. */ +@Deprecated @ExtendWith(KafkaResourceExtension.class) -public class KafkaTestServerTest { - private static final Logger logger = LoggerFactory.getLogger(KafkaTestServerTest.class); +class KafkaResourceExtensionTest { + private static final Logger logger = LoggerFactory.getLogger(KafkaResourceExtensionTest.class); /** * We have a single embedded kafka server that gets started when this test class is initialized. @@ -72,7 +73,7 @@ public class KafkaTestServerTest { * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. * @param sharedKafkaTestResource Provided by KafkaResourceExtension. */ - public KafkaTestServerTest(final SharedKafkaTestResource sharedKafkaTestResource) { + public KafkaResourceExtensionTest(final SharedKafkaTestResource sharedKafkaTestResource) { this.sharedKafkaTestResource = sharedKafkaTestResource; } diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java index 4da7225..0f85890 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +45,7 @@ /** * Serves both as a test for the Utilities, but also as a good example of how to use them. */ -@ExtendWith(KafkaResourceExtension.class) -public class KafkaTestUtilsTest { +class KafkaTestUtilsTest { private static final Logger logger = LoggerFactory.getLogger(KafkaTestUtilsTest.class); /** @@ -55,7 +54,8 @@ public class KafkaTestUtilsTest { * It's automatically started before any methods are run via the @ClassRule annotation. * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. */ - private final SharedKafkaTestResource sharedKafkaTestResource; + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); /** * Before every test, we generate a random topic name and create it within the embedded kafka server. @@ -63,14 +63,6 @@ public class KafkaTestUtilsTest { */ private String topicName; - /** - * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. - * @param sharedKafkaTestResource Provided by KafkaResourceExtension. - */ - public KafkaTestUtilsTest(SharedKafkaTestResource sharedKafkaTestResource) { - this.sharedKafkaTestResource = sharedKafkaTestResource; - } - /** * This happens once before every test method. * Create a new empty namespace with randomly generated name. diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java new file mode 100644 index 0000000..6ea6e24 --- /dev/null +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java @@ -0,0 +1,169 @@ +package com.salesforce.kafka.test.junit5; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.salesforce.kafka.test.KafkaTestServer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test of SharedKafkaTestResource. + * + * This also serves as an example of how to use this library! + */ +class SharedKafkaTestResourceTest { + private static final Logger logger = LoggerFactory.getLogger(SharedKafkaTestResourceTest.class); + + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run. + * It's automatically stopped after all of the tests are completed. + * + * This example we override the Kafka broker id to '12', but this serves as an example of how you + * how you could override any Kafka broker property. + */ + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + .withBrokerProperty("broker.id", "1000"); + + /** + * Before every test, we generate a random topic name and create it within the embedded kafka server. + * Each test can then be segmented run any tests against its own topic. + */ + private String topicName; + + /** + * This happens once before every test method. + * Create a new empty namespace with randomly generated name. + */ + @BeforeEach + void beforeTest() { + // Generate topic name + topicName = getClass().getSimpleName() + Clock.systemUTC().millis(); + + // Create topic with a single partition, + // NOTE: This will create partition id 0, because partitions are indexed at 0 :) + getKafkaTestServer().createTopic(topicName, 1); + } + + /** + * Test that KafkaServer works as expected! + * + * This also serves as a decent example of how to use the producer and consumer. + */ + @Test + void testProducerAndConsumer() throws Exception { + final int partitionId = 0; + + // Define our message + final String expectedKey = "my-key"; + final String expectedValue = "my test message"; + + // Define the record we want to produce + ProducerRecord producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue); + + // Create a new producer + KafkaProducer producer = getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); + + // Produce it & wait for it to complete. + Future future = producer.send(producerRecord); + producer.flush(); + while (!future.isDone()) { + Thread.sleep(500L); + } + logger.info("Produce completed"); + + // Close producer! + producer.close(); + + KafkaConsumer kafkaConsumer = + getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class); + + final List topicPartitionList = Lists.newArrayList(); + for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + kafkaConsumer.assign(topicPartitionList); + kafkaConsumer.seekToBeginning(topicPartitionList); + + // Pull records from kafka, keep polling until we get nothing back + ConsumerRecords records; + do { + records = kafkaConsumer.poll(2000L); + logger.info("Found {} records in kafka", records.count()); + for (ConsumerRecord record: records) { + // Validate + assertEquals(expectedKey, record.key(), "Key matches expected"); + assertEquals(expectedValue, record.value(), "value matches expected"); + } + } + while (!records.isEmpty()); + + // close consumer + kafkaConsumer.close(); + } + + /** + * Test if we create a topic more than once, no errors occur. + */ + @Test + void testCreatingTopicMultipleTimes() { + final String myTopic = "myTopic"; + for (int creationCounter = 0; creationCounter < 5; creationCounter++) { + getKafkaTestServer().createTopic(myTopic); + } + assertTrue(true, "Made it here!"); + } + + /** + * Validate that broker Id was overridden correctly. + */ + @Test + void testBrokerIdOverride() throws ExecutionException, InterruptedException { + try (final AdminClient adminClient = getKafkaTestServer().getAdminClient()) { + final Collection nodes = adminClient.describeCluster().nodes().get(); + + assertNotNull(nodes, "Sanity test, should not be null"); + assertEquals(1, nodes.size(), "Should have 1 entry"); + + // Get details about our test broker/node + final Node node = Iterables.get(nodes, 0); + + // Validate + assertEquals(1000, node.id(), "Has expected overridden broker Id"); + } + } + + /** + * Simple accessor. + */ + private KafkaTestServer getKafkaTestServer() { + return sharedKafkaTestResource.getKafkaTestServer(); + } +} diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java new file mode 100644 index 0000000..7226703 --- /dev/null +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java @@ -0,0 +1,48 @@ +package com.salesforce.kafka.test.junit5; + +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test of SharedZookeeperTestResource. + * + * This also serves as an example of how to use this library! + */ +class SharedZookeeperTestResourceTest { + /** + * We have a single embedded zookeeper server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ExtendWith annotation. + * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. + * This instance is passed to this class's constructor via the @ExtendWith annotation. + */ + @RegisterExtension + public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + void testGetZookeeperConnectString() { + final String actualConnectStr = sharedZookeeperTestResource.getZookeeperConnectString(); + + // Validate + assertNotNull(actualConnectStr, "Should have non-null connect string"); + assertTrue(actualConnectStr.startsWith("127.0.0.1:"), "Should start with 127.0.0.1"); + } + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + void testZookeeperServer() { + final TestingServer zkTestServer = sharedZookeeperTestResource.getZookeeperTestServer(); + + // Validate + assertNotNull(zkTestServer, "Should have non-null instance"); + } +} \ No newline at end of file diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java similarity index 92% rename from kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java rename to kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java index e25438c..91e6e2b 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java @@ -33,12 +33,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Test of Zookeeper Test instance. + * @deprecated Please use SharedZookeeperTestResourceTest as a reference for using this library. + * + * Test of ZookeeperResourceExtension. * * This also serves as an example of how to use this library! */ +@Deprecated @ExtendWith(ZookeeperResourceExtension.class) -public class ZookeeperTestServerTest { +public class ZookeeperResourseExtensionTest { /** * We have a single embedded zookeeper server that gets started when this test class is initialized. * @@ -52,7 +55,7 @@ public class ZookeeperTestServerTest { * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. * @param sharedZookeeperTestResource Provided by ZookeeperResourceExtension. */ - public ZookeeperTestServerTest(final SharedZookeeperTestResource sharedZookeeperTestResource) { + public ZookeeperResourseExtensionTest(final SharedZookeeperTestResource sharedZookeeperTestResource) { this.sharedZookeeperTestResource = sharedZookeeperTestResource; } From 38b59493b14a4b58bea84ad4858567ba3d5bf533 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 14:44:22 +0900 Subject: [PATCH 10/20] [ISSUE-12] Updated README for kafka-junit5 --- CHANGELOG.md | 1 + kafka-junit4/README.md | 8 +++--- kafka-junit5/README.md | 64 +++++++++++++++++++----------------------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8348d04..e859513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## 2.2.1 (5/16/2018) - [Issue-12](https://github.com/salesforce/kafka-junit/issues/12) Added ability to pass broker properties to be used by test kafka service instance. +- Deprecated Kafka-JUnit5 @ExtendWith annotation implementations. This has been replaced in favor of @RegisterExtension annotation. Review [README.md](kafka-junit5/README.md) for more information on updated usage instructions. - Added helper method getAdminClient() on KafkaTestServer. ## 2.2.0 (4/24/2018) diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md index 73939b0..fb43201 100644 --- a/kafka-junit4/README.md +++ b/kafka-junit4/README.md @@ -127,7 +127,7 @@ SharedKafkaTestResource exposes the ability to override properties set on the te /** * This is an example of how to override configuration values for the test kafka broker instance. * - * Here we define the broker.id to be set to 100, and disable topic auto-creation. + * Here we define the broker.id to be set to 1000, and disable topic auto-creation. */ @ClassRule public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() @@ -135,7 +135,7 @@ SharedKafkaTestResource exposes the ability to override properties set on the te .withBrokerProperty("auto.create.topics.enable", "false"); ``` -SharedKafkaTestResource has two accessors that you can make use of in your tests to interact with the service. +[SharedKafkaTestResource](src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java) instance has two accessors that you can make use of in your tests to interact with the service. ```java /** @@ -154,7 +154,7 @@ SharedKafkaTestResource has two accessors that you can make use of in your tests Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal kafka server. We've tried to collect some of these within [KafkaTestUtils](../kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! -For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). +For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java). #### Zookeeper Test Server @@ -175,7 +175,7 @@ If you need to run tests against an **only** embedded Zookeeper server and not a public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); ``` -SharedZookeeperTestResource has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. +[SharedZookeeperTestResource](src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java) has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. ```java /** diff --git a/kafka-junit5/README.md b/kafka-junit5/README.md index 1264b30..894691a 100644 --- a/kafka-junit5/README.md +++ b/kafka-junit5/README.md @@ -105,33 +105,37 @@ Include this library in your project's POM with test scope. **You'll also need #### KafkaTestServer -A great example of how to use this can be found within our tests! Check out [KafkaTestServerTest.java](src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java) +A great example of how to use this can be found within our tests! Check out [SharedKafkaTestResourceTest.java](src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java). -Annotate your JUnit test class with `@ExtendWith(KafkaResourceExtension.class)` and add the appropriate constructor. The JUnit5 extension will handle automatically starting and stopping the embedded Kafka +Add the following to your JUnit test file and it will handle automatically starting and stopping the embedded Kafka instance for you. ```java -@ExtendWith(KafkaResourceExtension.class) -public class MyTestClass { /** * We have a single embedded kafka server that gets started when this test class is initialized. * - * It's automatically started before any methods are run via the @ExtendWith annotation. - * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. - * This instance is passed to this class's constructor via the @ExtendWith annotation. + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. */ - private final SharedKafkaTestResource sharedKafkaTestResource; + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); +``` + +SharedKafkaTestResource exposes the ability to override properties set on the test kafka broker instance. +```java /** - * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. - * @param sharedKafkaTestResource Provided by KafkaResourceExtension. + * This is an example of how to override configuration values for the test kafka broker instance. + * + * Here we define the broker.id to be set to 1000, and disable topic auto-creation. */ - public MyTestClass(final SharedKafkaTestResource sharedKafkaTestResource) { - this.sharedKafkaTestResource = sharedKafkaTestResource; - } + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + .withBrokerProperty("broker.id", "1000") + .withBrokerProperty("auto.create.topics.enable", "false"); ``` -[SharedKafkaTestResource](kafka-junit5/src/main/java/test/junit/SharedKafkaTestResource.java) instance has two accessors that you can make use of in your tests to interact with the service. +[SharedKafkaTestResource](src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java) instance has two accessors that you can make use of in your tests to interact with the service. ```java /** @@ -148,40 +152,30 @@ public class MyTestClass { #### KafkaTestUtils Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal -kafka server. We've tried to collect some of these within [KafkaTestUtils](kafka-junit5/src/main/java/test/KafkaTestUtils.java)! +kafka server. We've tried to collect some of these within [KafkaTestUtils](../kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! -For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). +For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java). #### Zookeeper Test Server -**Note** Since Kafka depends on Zookeeper, you get this for *free* if you use the [KafkaResourceExtension](kafka-junit5/src/main/java/test/junit/KafkaResourceExtension.java), you do not, and should not, use -both of these together within the same Test class. +**Note** Since Kafka depends on Zookeeper, you get this for *free* if you use the SharedKafkaTestResource, you do not, and should not, use + both of these together within the same Test class. -If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following annotation to your JUnit test class - and it will handle automatically start and stopping the embedded Zookeeper instance for you. +If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following +to your JUnit test class and it will handle automatically start and stopping the embedded Zookeeper instance for you. ```java -@ExtendWith(ZookeeperResourceExtension.class) -public class MyTestClass { /** * We have a single embedded zookeeper server that gets started when this test class is initialized. * - * It's automatically started before any methods are run via the @ExtendWith annotation. - * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. - * This instance is passed to this class's constructor via the @ExtendWith annotation. - */ - private final SharedZookeeperTestResource sharedZookeeperTestResource; - - /** - * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. - * @param sharedZookeeperTestResource Provided by ZookeeperResourceExtension. + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. */ - public MyTestClass(final SharedZookeeperTestResource sharedZookeeperTestResource) { - this.sharedZookeeperTestResource = sharedZookeeperTestResource; - } + @RegisterExtension + public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); ``` -[SharedZookeeperTestResource](kafka-junit5/src/main/java/test/junit/SharedZookeeperTestResource.java) has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. +[SharedZookeeperTestResource](src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java) has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. ```java /** From 60e24d050edddd546be38057c96ec569214954dc Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 14:56:43 +0900 Subject: [PATCH 11/20] [ISSUE-12] fix code style violations, update README --- kafka-junit4/README.md | 8 +++--- .../kafka/test/junit4/KafkaTestUtilsTest.java | 2 +- kafka-junit5/README.md | 10 ++++---- .../test/junit5/KafkaResourceExtension.java | 4 ++- .../junit5/ZookeeperResourceExtension.java | 4 ++- .../junit5/KafkaResourceExtensionTest.java | 4 +-- .../kafka/test/junit5/KafkaTestUtilsTest.java | 2 +- .../junit5/SharedKafkaTestResourceTest.java | 25 +++++++++++++++++++ .../SharedZookeeperTestResourceTest.java | 25 +++++++++++++++++++ .../ZookeeperResourseExtensionTest.java | 6 ++--- 10 files changed, 71 insertions(+), 19 deletions(-) diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md index fb43201..117d072 100644 --- a/kafka-junit4/README.md +++ b/kafka-junit4/README.md @@ -20,7 +20,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.0 + 2.2.1 test ``` @@ -32,7 +32,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.0 + 2.2.1 test @@ -58,7 +58,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.0 + 2.2.1 test @@ -84,7 +84,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.0 + 2.2.1 test diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java index b5954be..843c8c8 100644 --- a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java @@ -86,7 +86,7 @@ public void testProducerAndConsumerUtils() { final int partitionId = 2; // Create our utility class - final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(getKafkaTestServer()); + final KafkaTestUtils kafkaTestUtils = sharedKafkaTestResource.getKafkaTestUtils(); // Produce some random records final List> producedRecordsList = diff --git a/kafka-junit5/README.md b/kafka-junit5/README.md index 894691a..1e0fcef 100644 --- a/kafka-junit5/README.md +++ b/kafka-junit5/README.md @@ -3,7 +3,7 @@ This library wraps Kafka Test Server and allows you to easily create and run tests against a "real" kafka server running within your tests, no more needing to stand up an external kafka cluster! -Kafka-JUnit5 is built on-top of **JUnit 5** as an Extension using the **@ExtendWith** annotation. +Kafka-JUnit5 is built on-top of **JUnit 5** as an Extension using the **@RegisterExtension** annotation. Kafka-JUnit5 works with Kafka versions **0.11.0.x**, **1.0.x**, and **1.1.x** and must be explicitly declared in your project's POM. @@ -20,7 +20,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.0 + 2.2.1 test ``` @@ -32,7 +32,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.0 + 2.2.1 test @@ -58,7 +58,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.0 + 2.2.1 test @@ -84,7 +84,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.0 + 2.2.1 test diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java index 5c5e535..5426245 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * @deprecated This class is superseded by SharedKafkaTestResource. + * This class is superseded by SharedKafkaTestResource. Please reference it instead of this class. * * JUnit 5 extension to provide an internal test kafka server to be shared across test cases within the same test class. * @@ -51,6 +51,8 @@ * * Within your test case methods: * this.sharedKafkaTestResource.getKafkaTestServer()... + * + * @deprecated This class is superseded by SharedKafkaTestResource. */ @Deprecated public class KafkaResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java index ae110fe..0f706e9 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java @@ -33,7 +33,7 @@ import org.junit.jupiter.api.extension.ParameterResolver; /** - * @deprecated This class is superseded by SharedZookeeperTestResource. + * This class is superseded by SharedZookeeperTestResource. Please reference it instead of this class. * * JUnit 5 extension to provide an internal test zookeeper server to be shared across test cases within the same test class. * @@ -50,6 +50,8 @@ * Within your test case methods: * this.sharedZookeeperTestResource.getZookeeperTestServer()... * this.sharedZookeeperTestResource.getZookeeperConnectString()... + * + * @deprecated This class is superseded by SharedZookeeperTestResource. */ @Deprecated public class ZookeeperResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java index a8aaf6d..2d7c911 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java @@ -51,9 +51,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** - * @deprecated Please use SharedKafkaTestResourceTest as a reference for using this library. - * * This serves as an example of how to use this library using @ExtendWith annotation. + * + * @deprecated Please use SharedKafkaTestResourceTest as a reference for using this library. */ @Deprecated @ExtendWith(KafkaResourceExtension.class) diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java index 0f85890..89c6ecb 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java @@ -86,7 +86,7 @@ void testProducerAndConsumerUtils() { final int partitionId = 2; // Create our utility class - final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(getKafkaTestServer()); + final KafkaTestUtils kafkaTestUtils = sharedKafkaTestResource.getKafkaTestUtils(); // Produce some random records final List> producedRecordsList = diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java index 6ea6e24..5b49c3b 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java @@ -1,3 +1,28 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + package com.salesforce.kafka.test.junit5; import com.google.common.collect.Iterables; diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java index 7226703..3605753 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResourceTest.java @@ -1,3 +1,28 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + package com.salesforce.kafka.test.junit5; import org.apache.curator.test.TestingServer; diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java index 91e6e2b..dfd6f09 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperResourseExtensionTest.java @@ -33,11 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** - * @deprecated Please use SharedZookeeperTestResourceTest as a reference for using this library. - * - * Test of ZookeeperResourceExtension. + * This serves as an example of how to use this library using @ExtendWith annotation. * - * This also serves as an example of how to use this library! + * @deprecated Please use SharedZookeeperTestResourceTest as a reference for using this library. */ @Deprecated @ExtendWith(ZookeeperResourceExtension.class) From bb99ec8b411d3448861906a565afe9b696aed4f2 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 15:10:41 +0900 Subject: [PATCH 12/20] [ISSUE-12] bump release version --- kafka-junit-core/pom.xml | 4 ++-- kafka-junit4/pom.xml | 6 +++--- kafka-junit5/pom.xml | 6 +++--- pom.xml | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka-junit-core/pom.xml b/kafka-junit-core/pom.xml index 3a0979d..9777a05 100644 --- a/kafka-junit-core/pom.xml +++ b/kafka-junit-core/pom.xml @@ -5,12 +5,12 @@ kafka-junit com.salesforce.kafka.test - 2.2.0 + 2.2.1 4.0.0 kafka-junit-core - 2.2.0 + 2.2.1 diff --git a/kafka-junit4/pom.xml b/kafka-junit4/pom.xml index 0cec657..ad33ae3 100644 --- a/kafka-junit4/pom.xml +++ b/kafka-junit4/pom.xml @@ -32,13 +32,13 @@ kafka-junit com.salesforce.kafka.test - 2.2.0 + 2.2.1 4.0.0 kafka-junit4 - 2.2.0 + 2.2.1 @@ -53,7 +53,7 @@ com.salesforce.kafka.test kafka-junit-core - 2.2.0 + 2.2.1 diff --git a/kafka-junit5/pom.xml b/kafka-junit5/pom.xml index cf7bfb5..2009800 100644 --- a/kafka-junit5/pom.xml +++ b/kafka-junit5/pom.xml @@ -31,12 +31,12 @@ kafka-junit com.salesforce.kafka.test - 2.2.0 + 2.2.1 4.0.0 kafka-junit5 - 2.2.0 + 2.2.1 @@ -51,7 +51,7 @@ com.salesforce.kafka.test kafka-junit-core - 2.2.0 + 2.2.1 diff --git a/pom.xml b/pom.xml index a846939..db6e2a8 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.salesforce.kafka.test kafka-junit - 2.2.0 + 2.2.1 From 84a85b95f8ca8892ac5021e7355dacbc55e407ed Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 15:28:24 +0900 Subject: [PATCH 13/20] [ISSUE-12] Refactor KafkaTestServer constructors --- CHANGELOG.md | 5 +- .../kafka/test/KafkaTestServer.java | 62 +++++++++++-------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e859513..aeff696 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## 2.2.1 (5/16/2018) - [Issue-12](https://github.com/salesforce/kafka-junit/issues/12) Added ability to pass broker properties to be used by test kafka service instance. +- Added helper method getAdminClient() on KafkaTestServer to get a configured AdminClient instance. - Deprecated Kafka-JUnit5 @ExtendWith annotation implementations. This has been replaced in favor of @RegisterExtension annotation. Review [README.md](kafka-junit5/README.md) for more information on updated usage instructions. -- Added helper method getAdminClient() on KafkaTestServer. +- Deprecated KafkaTestServer constructor: `public KafkaTestServer(final String localHostname)` + + This constructor was replaced with the constructor `KafkaTestServer(final Properties overrideBrokerProperties)` where overrideBrokerProperties contains the property `host.name` set to the hostname or IP address Kafka should use. ## 2.2.0 (4/24/2018) - [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)! diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index 3e9a68c..2b64e94 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -56,6 +56,11 @@ * or use the AutoCloseable interface. */ public class KafkaTestServer implements AutoCloseable { + /** + * This defines the hostname the kafka instance will listen on by default. + */ + private static final String DEFAULT_HOSTNAME = "127.0.0.1"; + /** * Internal Test Zookeeper service. */ @@ -71,47 +76,41 @@ public class KafkaTestServer implements AutoCloseable { */ private String kafkaPort; - /** - * Defines what address the service advertises itself on. - * Sane default is 127.0.0.1. - */ - private final String localHostname; - /** * Defines overridden broker properties. */ private final Properties overrideBrokerProperties = new Properties(); /** - * Default constructor using "127.0.0.1" as the advertised host. + * Default constructor. */ public KafkaTestServer() { - this("127.0.0.1", new Properties()); + this(new Properties()); } /** * Alternative constructor allowing override of advertised host. * @param localHostname What IP or hostname to advertise services on. + * @deprecated Replaced with constructor: KafkaTestServer(final Properties overrideBrokerProperties) + * Set "host.name" property to the hostname you want kafka to listen on. */ + @Deprecated public KafkaTestServer(final String localHostname) { - this(localHostname, new Properties()); - } + this(new Properties()); - /** - * Alternative constructor allowing override of advertised host and brokerProperties. - * @param overrideBrokerProperties Define Kafka broker properties. - */ - public KafkaTestServer(final Properties overrideBrokerProperties) { - this("127.0.0.1", overrideBrokerProperties); + // Configure passed in hostname in broker properties. + overrideBrokerProperties.put("host.name", localHostname); } /** - * Alternative constructor allowing override of advertised host and brokerProperties. - * @param localHostname What IP or hostname to advertise services on. + * Alternative constructor allowing override of brokerProperties. * @param overrideBrokerProperties Define Kafka broker properties. */ - public KafkaTestServer(final String localHostname, final Properties overrideBrokerProperties) { - this.localHostname = localHostname; + public KafkaTestServer(final Properties overrideBrokerProperties) { + // Validate argument. + if (overrideBrokerProperties == null) { + throw new IllegalArgumentException("Cannot pass null overrideBrokerProperties argument"); + } // Add passed in properties. this.overrideBrokerProperties.putAll(overrideBrokerProperties); @@ -135,14 +134,14 @@ public KafkaServerStartable getKafkaServer() { * @return The proper connect string to use for Kafka. */ public String getKafkaConnectString() { - return localHostname + ":" + kafkaPort; + return getConfiguredHostname() + ":" + kafkaPort; } /** * @return The proper connect string to use for Zookeeper. */ public String getZookeeperConnectString() { - return localHostname + ":" + getZookeeperServer().getPort(); + return getConfiguredHostname() + ":" + getZookeeperServer().getPort(); } /** @@ -175,11 +174,11 @@ public void start() throws Exception { } // Ensure that we're advertising appropriately - setPropertyIfNotSet(brokerProperties, "host.name", localHostname); - setPropertyIfNotSet(brokerProperties, "advertised.host.name", localHostname); + setPropertyIfNotSet(brokerProperties, "host.name", getConfiguredHostname()); + setPropertyIfNotSet(brokerProperties, "advertised.host.name", getConfiguredHostname()); setPropertyIfNotSet(brokerProperties, "advertised.port", kafkaPort); - setPropertyIfNotSet(brokerProperties, "advertised.listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); - setPropertyIfNotSet(brokerProperties, "listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort); + setPropertyIfNotSet(brokerProperties, "advertised.listeners", "PLAINTEXT://" + getConfiguredHostname() + ":" + kafkaPort); + setPropertyIfNotSet(brokerProperties, "listeners", "PLAINTEXT://" + getConfiguredHostname() + ":" + kafkaPort); // Set other defaults if not defined. setPropertyIfNotSet(brokerProperties, "auto.create.topics.enable", "true"); @@ -389,6 +388,17 @@ private Object setPropertyIfNotSet(final Properties properties, final String key return properties.get(key); } + /** + * Returns which hostname/IP address Kafka will bind/listen/advertise with. To change this value + * use the constructor: KafkaTestServer(final Properties overrideBrokerProperties) and set the property + * 'host.name' to the appropriate value. + * + * @return Which hostname/IP Kafka should bind/listen/advertise using. + */ + private String getConfiguredHostname() { + return overrideBrokerProperties.getProperty("host.name", DEFAULT_HOSTNAME); + } + /** * Closes the internal servers. Failing to call this at the end of your tests will likely * result in leaking instances. From 777cff07e0fc3ae8f10ab1950c03734a227eb2d8 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 15:32:31 +0900 Subject: [PATCH 14/20] [ISSUE-12] fix build warnings --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index db6e2a8..1d8dc47 100644 --- a/pom.xml +++ b/pom.xml @@ -182,6 +182,7 @@ **.yml **.yaml **.xml + **.versionsBackup script/** @@ -212,6 +213,7 @@ warning true false + false ${skipCheckStyle} From ab8528e7f7e8ab7720ef284314f2c80a5ea0922d Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 15:37:49 +0900 Subject: [PATCH 15/20] fix grammar --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aeff696..8a8fc48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Deprecated Kafka-JUnit5 @ExtendWith annotation implementations. This has been replaced in favor of @RegisterExtension annotation. Review [README.md](kafka-junit5/README.md) for more information on updated usage instructions. - Deprecated KafkaTestServer constructor: `public KafkaTestServer(final String localHostname)` - This constructor was replaced with the constructor `KafkaTestServer(final Properties overrideBrokerProperties)` where overrideBrokerProperties contains the property `host.name` set to the hostname or IP address Kafka should use. + This constructor was replaced with the constructor `KafkaTestServer(final Properties overrideBrokerProperties)` where overrideBrokerProperties should contain the property `host.name` set to the hostname or IP address Kafka should use. ## 2.2.0 (4/24/2018) - [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)! From d685b03c95d189887feed5e5288510ae304a5245 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Wed, 16 May 2018 17:29:12 +0900 Subject: [PATCH 16/20] [ISSUE-12] fix bug in default properties vs adding properties --- .../main/java/com/salesforce/kafka/test/KafkaTestServer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index 2b64e94..e16138e 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -155,7 +155,8 @@ public void start() throws Exception { final String zkConnectionString = getZookeeperServer().getConnectString(); // Build properties using a baseline from overrideBrokerProperties. - final Properties brokerProperties = new Properties(overrideBrokerProperties); + final Properties brokerProperties = new Properties(); + brokerProperties.putAll(overrideBrokerProperties); // Put required zookeeper connection properties. setPropertyIfNotSet(brokerProperties, "zookeeper.connect", zkConnectionString); From ca9e719aa2f12b0adb969bc57f92141cc1ef719a Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Thu, 17 May 2018 09:26:18 +0900 Subject: [PATCH 17/20] [ISSUE-12] set release version to 2.3.0 --- CHANGELOG.md | 2 +- kafka-junit-core/pom.xml | 4 ++-- kafka-junit4/README.md | 8 ++++---- kafka-junit4/pom.xml | 6 +++--- kafka-junit5/README.md | 8 ++++---- kafka-junit5/pom.xml | 6 +++--- pom.xml | 2 +- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a8fc48..8fdaf65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## 2.2.1 (5/16/2018) +## 2.3.0 (5/17/2018) - [Issue-12](https://github.com/salesforce/kafka-junit/issues/12) Added ability to pass broker properties to be used by test kafka service instance. - Added helper method getAdminClient() on KafkaTestServer to get a configured AdminClient instance. - Deprecated Kafka-JUnit5 @ExtendWith annotation implementations. This has been replaced in favor of @RegisterExtension annotation. Review [README.md](kafka-junit5/README.md) for more information on updated usage instructions. diff --git a/kafka-junit-core/pom.xml b/kafka-junit-core/pom.xml index 9777a05..7cf33b5 100644 --- a/kafka-junit-core/pom.xml +++ b/kafka-junit-core/pom.xml @@ -5,12 +5,12 @@ kafka-junit com.salesforce.kafka.test - 2.2.1 + 2.3.0 4.0.0 kafka-junit-core - 2.2.1 + 2.3.0 diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md index 117d072..1c465f2 100644 --- a/kafka-junit4/README.md +++ b/kafka-junit4/README.md @@ -20,7 +20,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.1 + 2.3.0 test ``` @@ -32,7 +32,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.1 + 2.3.0 test @@ -58,7 +58,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.1 + 2.3.0 test @@ -84,7 +84,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit4 - 2.2.1 + 2.3.0 test diff --git a/kafka-junit4/pom.xml b/kafka-junit4/pom.xml index ad33ae3..f89b5e8 100644 --- a/kafka-junit4/pom.xml +++ b/kafka-junit4/pom.xml @@ -32,13 +32,13 @@ kafka-junit com.salesforce.kafka.test - 2.2.1 + 2.3.0 4.0.0 kafka-junit4 - 2.2.1 + 2.3.0 @@ -53,7 +53,7 @@ com.salesforce.kafka.test kafka-junit-core - 2.2.1 + 2.3.0 diff --git a/kafka-junit5/README.md b/kafka-junit5/README.md index 1e0fcef..1d1c60f 100644 --- a/kafka-junit5/README.md +++ b/kafka-junit5/README.md @@ -20,7 +20,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.1 + 2.3.0 test ``` @@ -32,7 +32,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.1 + 2.3.0 test @@ -58,7 +58,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.1 + 2.3.0 test @@ -84,7 +84,7 @@ Include this library in your project's POM with test scope. **You'll also need com.salesforce.kafka.test kafka-junit5 - 2.2.1 + 2.3.0 test diff --git a/kafka-junit5/pom.xml b/kafka-junit5/pom.xml index 2009800..fdca2a9 100644 --- a/kafka-junit5/pom.xml +++ b/kafka-junit5/pom.xml @@ -31,12 +31,12 @@ kafka-junit com.salesforce.kafka.test - 2.2.1 + 2.3.0 4.0.0 kafka-junit5 - 2.2.1 + 2.3.0 @@ -51,7 +51,7 @@ com.salesforce.kafka.test kafka-junit-core - 2.2.1 + 2.3.0 diff --git a/pom.xml b/pom.xml index 1d8dc47..536285c 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.salesforce.kafka.test kafka-junit - 2.2.1 + 2.3.0 From 820386c03580501d887e2f0636a46cdb436f81c7 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Thu, 17 May 2018 09:42:19 +0900 Subject: [PATCH 18/20] [ISSUE-12] DRY SharedZookeeperTestResource between Junit4 and Junit5 implementations --- .../kafka/test/ZookeeperTestServer.java | 103 ++++++++++++++++++ .../junit4/SharedZookeeperTestResource.java | 58 ++++------ .../junit5/SharedZookeeperTestResource.java | 67 +++--------- 3 files changed, 139 insertions(+), 89 deletions(-) create mode 100644 kafka-junit-core/src/main/java/com/salesforce/kafka/test/ZookeeperTestServer.java diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/ZookeeperTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/ZookeeperTestServer.java new file mode 100644 index 0000000..f9d074a --- /dev/null +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/ZookeeperTestServer.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test; + +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Wrapper around TestingServer zookeeper test server instance. This is here to 'DRY' out the code + * between the JUnit4 and JUnit5 implementations. + */ +public class ZookeeperTestServer { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperTestServer.class); + + /** + * Our internal Zookeeper test server instance. + */ + private TestingServer zookeeperTestServer = null; + + /** + * Starts the internal Test zookeeper server instance. + */ + public void start() { + logger.info("Starting Zookeeper test server"); + if (zookeeperTestServer != null) { + throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); + } + + // Setup zookeeper test server + final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); + try { + zookeeperTestServer = new TestingServer(zkInstanceSpec, true); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + /** + * Stops the internal Test zookeeper server instance. + */ + public void stop() { + logger.info("Shutting down zookeeper test server"); + + // If we don't have an instance + if (zookeeperTestServer != null) { + try { + zookeeperTestServer.stop(); + zookeeperTestServer.close(); + + // null out reference + zookeeperTestServer = null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * @return Underlying shared Zookeeper test server instance. + * @throws IllegalStateException if start() has not been called yet. + */ + public TestingServer getZookeeperTestServer() { + if (zookeeperTestServer == null) { + throw new IllegalStateException("Unable to get test server instance before being created!"); + } + return zookeeperTestServer; + } + + /** + * @return Connection string to connect to the Zookeeper instance. + * @throws IllegalStateException if start() has not been called yet. + */ + public String getZookeeperConnectString() { + return getZookeeperTestServer().getConnectString(); + } +} diff --git a/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java index e6a1548..24f1725 100644 --- a/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java +++ b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java @@ -25,13 +25,9 @@ package com.salesforce.kafka.test.junit4; -import org.apache.curator.test.InstanceSpec; +import com.salesforce.kafka.test.ZookeeperTestServer; import org.apache.curator.test.TestingServer; import org.junit.rules.ExternalResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; /** * Creates and stands up an internal test Zookeeper server to be shared across test cases within the same test class. @@ -45,57 +41,41 @@ * sharedZookeeperTestResource.getZookeeperTestServer()... */ public class SharedZookeeperTestResource extends ExternalResource { - private static final Logger logger = LoggerFactory.getLogger(SharedZookeeperTestResource.class); - /** * Our internal Zookeeper test server instance. */ - private TestingServer zookeeperTestServer = null; + private final ZookeeperTestServer zookeeperTestServer = new ZookeeperTestServer(); /** - * Here we stand up an internal test zookeeper service. - * Once for all tests that use this shared resource. + * @return Shared Zookeeper test server instance. + * @throws IllegalStateException if before() has not been called yet. */ - protected void before() throws Exception { - logger.info("Starting Zookeeper test server"); - if (zookeeperTestServer != null) { - throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); - } - // Setup zookeeper test server - final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); - zookeeperTestServer = new TestingServer(zkInstanceSpec, true); + public TestingServer getZookeeperTestServer() throws IllegalStateException { + return zookeeperTestServer.getZookeeperTestServer(); } /** - * Here we shut down the internal test zookeeper service. + * @return Connection string to connect to the Zookeeper instance. + * @throws IllegalStateException if before() has not been called yet. */ - protected void after() { - logger.info("Shutting down zookeeper test server"); - - // Close out zookeeper test server if needed - if (zookeeperTestServer == null) { - return; - } - try { - zookeeperTestServer.stop(); - zookeeperTestServer.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - zookeeperTestServer = null; + public String getZookeeperConnectString() throws IllegalStateException { + return zookeeperTestServer.getZookeeperConnectString(); } /** - * @return Shared Zookeeper test server instance. + * Here we stand up an internal test zookeeper service. + * once for all tests that use this shared resource. + * @throws RuntimeException on startup errors. */ - public TestingServer getZookeeperTestServer() { - return zookeeperTestServer; + protected void before() throws RuntimeException { + zookeeperTestServer.start(); } /** - * @return Connection string to connect to the Zookeeper instance. + * Here we shut down the internal test zookeeper service. + * @throws RuntimeException on shutdown errors. */ - public String getZookeeperConnectString() { - return zookeeperTestServer.getConnectString(); + protected void after() throws RuntimeException { + zookeeperTestServer.stop(); } } \ No newline at end of file diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java index 681d2e1..d013e42 100644 --- a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java @@ -25,86 +25,53 @@ package com.salesforce.kafka.test.junit5; -import org.apache.curator.test.InstanceSpec; +import com.salesforce.kafka.test.ZookeeperTestServer; import org.apache.curator.test.TestingServer; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; /** * Shared Zookeeper Test Resource instance. Contains references to internal Zookeeper server instances. */ public class SharedZookeeperTestResource implements BeforeAllCallback, AfterAllCallback { - private static final Logger logger = LoggerFactory.getLogger(SharedZookeeperTestResource.class); - /** * Our internal Zookeeper test server instance. */ - private TestingServer zookeeperTestServer = null; + private final ZookeeperTestServer zookeeperTestServer = new ZookeeperTestServer(); /** * @return Shared Zookeeper test server instance. - * @throws IllegalStateException if beforeAll() has not been called yet. + * @throws IllegalStateException if before() has not been called yet. */ - public TestingServer getZookeeperTestServer() { - if (zookeeperTestServer == null) { - throw new IllegalStateException("Unable to get test server instance before being created!"); - } - return zookeeperTestServer; + public TestingServer getZookeeperTestServer() throws IllegalStateException { + return zookeeperTestServer.getZookeeperTestServer(); } /** * @return Connection string to connect to the Zookeeper instance. + * @throws IllegalStateException if before() has not been called yet. */ - public String getZookeeperConnectString() { - return getZookeeperTestServer().getConnectString(); + public String getZookeeperConnectString() throws IllegalStateException { + return zookeeperTestServer.getZookeeperConnectString(); } /** - * Here we shut down the internal test zookeeper service. + * Here we stand up an internal test zookeeper service. + * once for all tests that use this shared resource. + * @throws RuntimeException on startup errors. */ @Override - public void afterAll(ExtensionContext context) throws Exception { - logger.info("Shutting down zookeeper test server"); - - // If we don't have an instance - if (zookeeperTestServer == null) { - // Nothing to close. - return; - } - - try { - zookeeperTestServer.stop(); - zookeeperTestServer.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - // null out reference - zookeeperTestServer = null; + public void beforeAll(ExtensionContext context) throws RuntimeException { + zookeeperTestServer.start(); } /** - * Here we stand up an internal test zookeeper service. - * Once for all tests that use this shared resource. + * Here we shut down the internal test zookeeper service. + * @throws RuntimeException on shutdown errors. */ @Override - public void beforeAll(ExtensionContext context) throws Exception { - logger.info("Starting Zookeeper test server"); - if (zookeeperTestServer != null) { - throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); - } - - // Setup zookeeper test server - final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); - try { - zookeeperTestServer = new TestingServer(zkInstanceSpec, true); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } + public void afterAll(ExtensionContext context) { + zookeeperTestServer.stop(); } } From b76c5c3568786b6e2d6c12616c9d572202da3296 Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Thu, 17 May 2018 10:04:18 +0900 Subject: [PATCH 19/20] [ISSUE-12] general code cleanup --- .../kafka/test/KafkaTestServer.java | 12 +++-- .../salesforce/kafka/test/KafkaTestUtils.java | 18 +++---- .../junit4/SharedKafkaTestResourceTest.java | 51 +++++++++---------- .../junit5/KafkaResourceExtensionTest.java | 46 ++++++++--------- .../junit5/SharedKafkaTestResourceTest.java | 45 ++++++++-------- 5 files changed, 84 insertions(+), 88 deletions(-) diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index e16138e..01c0a05 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -25,8 +25,6 @@ package com.salesforce.kafka.test; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.common.io.Files; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; @@ -361,7 +359,7 @@ public KafkaConsumer getKafkaConsumer( * Internal helper method to build a default configuration. */ private Map buildDefaultClientConfig() { - Map defaultClientConfig = Maps.newHashMap(); + final Map defaultClientConfig = new HashMap<>(); defaultClientConfig.put("bootstrap.servers", getKafkaConnectString()); defaultClientConfig.put("client.id", "test-consumer-id"); return defaultClientConfig; @@ -376,8 +374,12 @@ private Map buildDefaultClientConfig() { */ private Object setPropertyIfNotSet(final Properties properties, final String key, final String defaultValue) { // Validate inputs - Preconditions.checkNotNull(properties); - Preconditions.checkNotNull(key); + if (properties == null) { + throw new NullPointerException("properties argument cannot be null."); + } + if (key == null) { + throw new NullPointerException("key argument cannot be null."); + } // Conditionally set the property if its not already set. properties.setProperty( diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java index ea5dab0..ce2403d 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java @@ -26,7 +26,6 @@ package com.salesforce.kafka.test; import com.google.common.base.Charsets; -import com.google.common.collect.Lists; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -76,18 +75,18 @@ public List> produceRecords( final int partitionId ) { // This holds the records we produced - List> producedRecords = Lists.newArrayList(); + final List> producedRecords = new ArrayList<>(); // This holds futures returned - List> producerFutures = Lists.newArrayList(); + final List> producerFutures = new ArrayList<>(); - KafkaProducer producer = kafkaTestServer.getKafkaProducer( + final KafkaProducer producer = kafkaTestServer.getKafkaProducer( ByteArraySerializer.class, ByteArraySerializer.class ); - for (Map.Entry entry: keysAndValues.entrySet()) { + for (final Map.Entry entry: keysAndValues.entrySet()) { // Construct filter - ProducerRecord record = new ProducerRecord<>(topicName, partitionId, entry.getKey(), entry.getValue()); + final ProducerRecord record = new ProducerRecord<>(topicName, partitionId, entry.getKey(), entry.getValue()); producedRecords.add(record); // Send it. @@ -100,7 +99,7 @@ public List> produceRecords( producer.close(); // Loop thru the futures, and build KafkaRecord objects - List> kafkaRecords = Lists.newArrayList(); + final List> kafkaRecords = new ArrayList<>(); try { for (int x = 0; x < keysAndValues.size(); x++) { final RecordMetadata metadata = producerFutures.get(x).get(); @@ -108,9 +107,8 @@ public List> produceRecords( kafkaRecords.add(ProducedKafkaRecord.newInstance(metadata, producerRecord)); } - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - throw new RuntimeException(e); + } catch (final InterruptedException | ExecutionException exception) { + throw new RuntimeException(exception); } return kafkaRecords; diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java index 9da73a5..36258f4 100644 --- a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java @@ -26,7 +26,6 @@ package com.salesforce.kafka.test.junit4; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.salesforce.kafka.test.KafkaTestServer; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -47,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.time.Clock; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; @@ -111,13 +111,13 @@ public void testProducerAndConsumer() throws Exception { final String expectedValue = "my test message"; // Define the record we want to produce - ProducerRecord producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue); + final ProducerRecord producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue); // Create a new producer - KafkaProducer producer = getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); + final KafkaProducer producer = getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); // Produce it & wait for it to complete. - Future future = producer.send(producerRecord); + final Future future = producer.send(producerRecord); producer.flush(); while (!future.isDone()) { Thread.sleep(500L); @@ -127,31 +127,30 @@ public void testProducerAndConsumer() throws Exception { // Close producer! producer.close(); - KafkaConsumer kafkaConsumer = - getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class); + // Create consumer + try (final KafkaConsumer kafkaConsumer = + getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class)) { - final List topicPartitionList = Lists.newArrayList(); - for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { - topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - kafkaConsumer.assign(topicPartitionList); - kafkaConsumer.seekToBeginning(topicPartitionList); - - // Pull records from kafka, keep polling until we get nothing back - ConsumerRecords records; - do { - records = kafkaConsumer.poll(2000L); - logger.info("Found {} records in kafka", records.count()); - for (ConsumerRecord record: records) { - // Validate - assertEquals("Key matches expected", expectedKey, record.key()); - assertEquals("value matches expected", expectedValue, record.value()); + final List topicPartitionList = new ArrayList<>(); + for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + kafkaConsumer.assign(topicPartitionList); + kafkaConsumer.seekToBeginning(topicPartitionList); + + // Pull records from kafka, keep polling until we get nothing back + ConsumerRecords records; + do { + records = kafkaConsumer.poll(2000L); + logger.info("Found {} records in kafka", records.count()); + for (ConsumerRecord record: records) { + // Validate + assertEquals("Key matches expected", expectedKey, record.key()); + assertEquals("value matches expected", expectedValue, record.value()); + } } + while (!records.isEmpty()); } - while (!records.isEmpty()); - - // close consumer - kafkaConsumer.close(); } /** diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java index 2d7c911..8727b17 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaResourceExtensionTest.java @@ -25,7 +25,6 @@ package com.salesforce.kafka.test.junit5; -import com.google.common.collect.Lists; import com.salesforce.kafka.test.KafkaTestServer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -44,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.time.Clock; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Future; @@ -127,31 +127,29 @@ void testProducerAndConsumer() throws Exception { // Close producer! producer.close(); - KafkaConsumer kafkaConsumer = - getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class); - - final List topicPartitionList = Lists.newArrayList(); - for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { - topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - kafkaConsumer.assign(topicPartitionList); - kafkaConsumer.seekToBeginning(topicPartitionList); - - // Pull records from kafka, keep polling until we get nothing back - ConsumerRecords records; - do { - records = kafkaConsumer.poll(2000L); - logger.info("Found {} records in kafka", records.count()); - for (ConsumerRecord record: records) { - // Validate - assertEquals(expectedKey, record.key(), "Key matches expected"); - assertEquals(expectedValue, record.value(), "value matches expected"); + // Create consumer + try (final KafkaConsumer kafkaConsumer = + getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class)) { + final List topicPartitionList = new ArrayList<>(); + for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } + kafkaConsumer.assign(topicPartitionList); + kafkaConsumer.seekToBeginning(topicPartitionList); + + // Pull records from kafka, keep polling until we get nothing back + ConsumerRecords records; + do { + records = kafkaConsumer.poll(2000L); + logger.info("Found {} records in kafka", records.count()); + for (ConsumerRecord record: records) { + // Validate + assertEquals(expectedKey, record.key(), "Key matches expected"); + assertEquals(expectedValue, record.value(), "value matches expected"); + } + } + while (!records.isEmpty()); } - while (!records.isEmpty()); - - // close consumer - kafkaConsumer.close(); } /** diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java index 5b49c3b..2902ca8 100644 --- a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResourceTest.java @@ -26,7 +26,6 @@ package com.salesforce.kafka.test.junit5; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.salesforce.kafka.test.KafkaTestServer; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -47,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.time.Clock; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; @@ -127,31 +127,30 @@ void testProducerAndConsumer() throws Exception { // Close producer! producer.close(); - KafkaConsumer kafkaConsumer = - getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class); + // Create consumer + try (final KafkaConsumer kafkaConsumer = + getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class)) { - final List topicPartitionList = Lists.newArrayList(); - for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { - topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - kafkaConsumer.assign(topicPartitionList); - kafkaConsumer.seekToBeginning(topicPartitionList); - - // Pull records from kafka, keep polling until we get nothing back - ConsumerRecords records; - do { - records = kafkaConsumer.poll(2000L); - logger.info("Found {} records in kafka", records.count()); - for (ConsumerRecord record: records) { - // Validate - assertEquals(expectedKey, record.key(), "Key matches expected"); - assertEquals(expectedValue, record.value(), "value matches expected"); + final List topicPartitionList = new ArrayList<>(); + for (final PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topicName)) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + kafkaConsumer.assign(topicPartitionList); + kafkaConsumer.seekToBeginning(topicPartitionList); + + // Pull records from kafka, keep polling until we get nothing back + ConsumerRecords records; + do { + records = kafkaConsumer.poll(2000L); + logger.info("Found {} records in kafka", records.count()); + for (ConsumerRecord record : records) { + // Validate + assertEquals(expectedKey, record.key(), "Key matches expected"); + assertEquals(expectedValue, record.value(), "value matches expected"); + } } + while (!records.isEmpty()); } - while (!records.isEmpty()); - - // close consumer - kafkaConsumer.close(); } /** From a8506c210f08b89529bbb2a5a54ceab0fa1bfecd Mon Sep 17 00:00:00 2001 From: Stephen Powis Date: Thu, 17 May 2018 10:17:35 +0900 Subject: [PATCH 20/20] fix codestyle violation --- .../kafka/test/junit4/SharedKafkaTestResourceTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java index 36258f4..a5b8fb0 100644 --- a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResourceTest.java @@ -114,7 +114,8 @@ public void testProducerAndConsumer() throws Exception { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue); // Create a new producer - final KafkaProducer producer = getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); + final KafkaProducer producer = + getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); // Produce it & wait for it to complete. final Future future = producer.send(producerRecord);