Skip to content

Commit

Permalink
add support for explicitly configured broker ports (#39)
Browse files Browse the repository at this point in the history
* add support for explicitly configured broker ports

* update readme
  • Loading branch information
Crim authored Nov 14, 2019
1 parent 7f8ca7d commit 9ab7114
Show file tree
Hide file tree
Showing 18 changed files with 350 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 3.2.0 (11/13/2019)
- [ISSUE-38](https://github.com/salesforce/kafka-junit/issues/38) Optionally allow for explicitly defining which ports kakfa brokers listen on.

## 3.1.2 (11/08/2019)
- [ISSUE-36](https://github.com/salesforce/kafka-junit/issues/36) Temporary directories should now be cleaned up properly on JVM shutdown.

Expand Down
4 changes: 2 additions & 2 deletions kafka-junit-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<artifactId>kafka-junit</artifactId>
<groupId>com.salesforce.kafka.test</groupId>
<version>3.1.2</version>
<version>3.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-junit-core</artifactId>
<version>3.1.2</version>
<version>3.2.0</version>

<!-- defined properties -->
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -140,6 +142,9 @@ public void start() throws Exception, TimeoutException {
// Ensure zookeeper instance has been started.
zkTestServer.start();

// Validate listeners
validateListenerPorts();

// If we have no brokers defined yet...
if (brokers.isEmpty()) {
// Loop over brokers, starting with brokerId 1.
Expand Down Expand Up @@ -169,6 +174,42 @@ public void start() throws Exception, TimeoutException {
waitUntilClusterReady(10_000L);
}

private void validateListenerPorts() {
final Set<Integer> registeredPorts = new HashSet<>();

// Validate listeners that have explicitly defined ports.
for (final BrokerListener listener : registeredListeners) {
// Listeners with no explicitly defined ports will use a randomly assigned port.
if (listener.getPorts().length == 0) {
// Skip to next listener.
continue;
}

// Otherwise ensure we have at least 1 port per broker
else if (listener.getPorts().length < numberOfBrokers) {
// TODO Log warning that a random port will be used
logger.warn(
"{} will use at least one randomly generated port. "
+ "To avoid this warning assign the same number of ports via the onPorts() method to this listener "
+ "as brokers you have in your test cluster.",
listener.getClass().getSimpleName()
);
}

// Ensure ports are not duplicated.
for (final int port : listener.getPorts()) {
if (registeredPorts.contains(port)) {
throw new RuntimeException(
"Error configuring listener " + listener.getClass().getSimpleName() + " as port " + port + " "
+ "is already registered by a listener. Ensure that all explicitly defined ports passed to "
+ "BrokerListener.onPorts() is unique across all listeners."
);
}
registeredPorts.add(port);
}
}
}

/**
* Returns an immutable list of broker hosts for the kafka cluster.
* @return immutable list of hosts for brokers within the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.salesforce.kafka.test.listeners.PlainListener;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -262,8 +261,8 @@ public void start() throws Exception {

// Loop over registered listeners and add each
for (final BrokerListener listener : registeredListeners) {
// Generate port to listen on.
final int port = InstanceSpec.getRandomPort();
// Get port to listen on.
final int port = listener.getNextPort();
final String listenerDefinition = listener.getProtocol() + "://" + getConfiguredHostname() + ":" + port;
listenerProperties.add(
new ListenerProperties(listener.getProtocol(), listenerDefinition, listener.getClientProperties())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright (c) 2017-2018, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.kafka.test.listeners;

import org.apache.curator.test.InstanceSpec;

/**
* Shared Listener class.
*
* @param <Self> reference to parent class.
*/
public abstract class AbstractListener<Self> implements BrokerListener {
/**
* Defines which port(s) to listen on.
*/
private int[] ports = {};
private int portIndex = 0;

/**
* Optionally allow for explicitly defining which ports this listener will bind to.
* Pass a unique port per broker running.
*
* If not explicitly called, random ports will be assigned to each listener and broker.
*
* @param ports the ports to bind to.
* @return self for method chaining.
*/
public Self onPorts(final int ... ports) {
this.ports = ports;
return (Self) this;
}

/**
* The ports configured.
* @return Configured ports.
*/
public int[] getPorts() {
return ports;
}

/**
* Internal method to get the next assigned port. If called more times than configured ports,
* this method will generate a random port to be used.
*
* @return next configured port to use.
*/
public int getNextPort() {
if (ports == null || ports.length == 0 || portIndex >= ports.length) {
// Return random Port
return InstanceSpec.getRandomPort();
}
return ports[portIndex++];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,18 @@ public interface BrokerListener {
* @return Properties to be registered on connecting client.
*/
Properties getClientProperties();

/**
* The ports configured.
* @return Configured ports.
*/
int[] getPorts();

/**
* Internal method to get the next assigned port. If called more times than configured ports,
* this method will generate a random port to be used.
*
* @return next configured port to use.
*/
int getNextPort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Default implementation. Defines a PLAINTEXT listener.
*/
public class PlainListener implements BrokerListener {
public class PlainListener extends AbstractListener<PlainListener> {

@Override
public String getProtocol() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* In order to make use of this Listener, you **must** start the JVM with the following:
* -Djava.security.auth.login.config=/path/to/your/jaas.conf
*/
public class SaslPlainListener implements BrokerListener {
public class SaslPlainListener extends AbstractListener<SaslPlainListener> {
private static final Logger logger = LoggerFactory.getLogger(SaslPlainListener.class);

private String username = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* In order to make use of this Listener, you **must** start the JVM with the following:
* -Djava.security.auth.login.config=/path/to/your/jaas.conf
*/
public class SaslSslListener implements BrokerListener {
public class SaslSslListener extends AbstractListener<SaslSslListener> {
private static final Logger logger = LoggerFactory.getLogger(SaslSslListener.class);

// SASL Settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Define and register an SSL listener on a Kafka broker.
*/
public class SslListener implements BrokerListener {
public class SslListener extends AbstractListener<SslListener> {

private String trustStoreFile = "";
private String trustStorePassword = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.salesforce.kafka.test.listeners.SaslPlainListener;
import com.salesforce.kafka.test.listeners.SaslSslListener;
import com.salesforce.kafka.test.listeners.SslListener;
import org.apache.curator.test.InstanceSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
Expand All @@ -40,6 +41,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -459,6 +461,88 @@ private static Stream<Arguments> provideListeners() {
);
}

/**
* Test a cluster instance with listeners on specified ports.
*/
@Test
void testListenerWithSpecificPort() throws Exception {
// Explicitly define our port
final int exportedPort1 = InstanceSpec.getRandomPort();
final int exportedPort2 = InstanceSpec.getRandomPort();

// Create default plain listener
final BrokerListener plainListener = new PlainListener()
.onPorts(exportedPort1, exportedPort2);
final List<BrokerListener> listeners = Collections.singletonList(plainListener);

final String topicName = "TestTopic-" + System.currentTimeMillis();
final int expectedMsgCount = 2;
final int numberOfBrokers = 2;

// Speed up shutdown in our tests
final Properties overrideProperties = getDefaultBrokerOverrideProperties();

// Create our test server instance
try (final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster(numberOfBrokers, overrideProperties, listeners)) {
// Start broker
kafkaTestCluster.start();

// Validate connect string is as expected.
final String connectString = kafkaTestCluster.getKafkaConnectString();
final String expectedConnectString = "PLAINTEXT://localhost:" + exportedPort1 + ",PLAINTEXT://localhost:" + exportedPort2;
Assertions.assertEquals(expectedConnectString, connectString, "Should be using our specified ports");

// Create KafkaTestUtils
final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(kafkaTestCluster);

// Create topic
kafkaTestUtils.createTopic(topicName, 1, (short) numberOfBrokers);

// Publish 2 messages into topic
kafkaTestUtils.produceRecords(expectedMsgCount, topicName, 0);

// Sanity test - Consume the messages back out before shutting down broker.
final List<ConsumerRecord<byte[], byte[]>> records = kafkaTestUtils.consumeAllRecordsFromTopic(topicName);
Assertions.assertNotNull(records);
Assertions.assertEquals(expectedMsgCount, records.size(), "Should have found 2 records.");
}
}

/**
* Test a cluster instance with listeners on specified ports, where a port is duplicated.
*/
@Test
void testListenerWithSpecificPortRepeated() throws Exception {
// Explicitly define our port
final int port1 = InstanceSpec.getRandomPort();
final int port2 = InstanceSpec.getRandomPort();
final int port3 = InstanceSpec.getRandomPort();

// Create plain listeners using the same port.
final BrokerListener plainListener1 = new PlainListener()
.onPorts(port1, port2);

final BrokerListener plainListener2 = new PlainListener()
.onPorts(port3, port1);

final List<BrokerListener> listeners = new ArrayList<>();
listeners.add(plainListener1);
listeners.add(plainListener2);

// Define how many brokers.
final int numberOfBrokers = 2;

// Speed up shutdown in our tests
final Properties overrideProperties = getDefaultBrokerOverrideProperties();

// Create our test server instance
try (final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster(numberOfBrokers, overrideProperties, listeners)) {

// Start broker, this should throw an exception
Assertions.assertThrows(RuntimeException.class, kafkaTestCluster::start);
}
}

private Properties getDefaultBrokerOverrideProperties() {
// Speed up shutdown in our tests
final Properties overrideProperties = new Properties();
Expand Down
Loading

0 comments on commit 9ab7114

Please sign in to comment.