Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue-12] Add ability to set broker properties. #13

Merged
merged 20 commits into from
May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
.settings/
target/
README.md.bak

*.versionsBackup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maven plugin that updates release versions creates backups of POM files with this extension. Just avoiding having those checked in by accident.

8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.3.0 (5/17/2018)
- [Issue-12](https://github.com/salesforce/kafka-junit/issues/12) Added ability to pass broker properties to be used by test kafka service instance.
- Added helper method getAdminClient() on KafkaTestServer to get a configured AdminClient instance.
- Deprecated Kafka-JUnit5 @ExtendWith annotation implementations. This has been replaced in favor of @RegisterExtension annotation. Review [README.md](kafka-junit5/README.md) for more information on updated usage instructions.
- Deprecated KafkaTestServer constructor: `public KafkaTestServer(final String localHostname)`

This constructor was replaced with the constructor `KafkaTestServer(final Properties overrideBrokerProperties)` where overrideBrokerProperties should contain the property `host.name` set to the hostname or IP address Kafka should use.

## 2.2.0 (4/24/2018)
- [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)!
- [Issue-4](https://github.com/salesforce/kafka-junit/issues/4) Fix server configuration to allow for transactional producers & consumers.
Expand Down
4 changes: 2 additions & 2 deletions kafka-junit-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<artifactId>kafka-junit</artifactId>
<groupId>com.salesforce.kafka.test</groupId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-junit-core</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>

<!-- Module Dependencies -->
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

package com.salesforce.kafka.test;

import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
Expand Down Expand Up @@ -55,6 +54,11 @@
* or use the AutoCloseable interface.
*/
public class KafkaTestServer implements AutoCloseable {
/**
* This defines the hostname the kafka instance will listen on by default.
*/
private static final String DEFAULT_HOSTNAME = "127.0.0.1";

/**
* Internal Test Zookeeper service.
*/
Expand All @@ -71,24 +75,43 @@ public class KafkaTestServer implements AutoCloseable {
private String kafkaPort;

/**
* Defines what address the service advertises itself on.
* Sane default is 127.0.0.1.
* Defines overridden broker properties.
*/
private final String localHostname;
private final Properties overrideBrokerProperties = new Properties();

/**
* Default constructor using "127.0.0.1" as the advertised host.
* Default constructor.
*/
public KafkaTestServer() {
this("127.0.0.1");
this(new Properties());
}

/**
* Alternative constructor allowing override of advertised host.
* @param localHostname What IP or hostname to advertise services on.
* @deprecated Replaced with constructor: KafkaTestServer(final Properties overrideBrokerProperties)
* Set "host.name" property to the hostname you want kafka to listen on.
*/
@Deprecated
public KafkaTestServer(final String localHostname) {
this.localHostname = localHostname;
this(new Properties());

// Configure passed in hostname in broker properties.
overrideBrokerProperties.put("host.name", localHostname);
}

/**
* Alternative constructor allowing override of brokerProperties.
* @param overrideBrokerProperties Define Kafka broker properties.
*/
public KafkaTestServer(final Properties overrideBrokerProperties) {
// Validate argument.
if (overrideBrokerProperties == null) {
throw new IllegalArgumentException("Cannot pass null overrideBrokerProperties argument");
}

// Add passed in properties.
this.overrideBrokerProperties.putAll(overrideBrokerProperties);
}

/**
Expand All @@ -109,14 +132,14 @@ public KafkaServerStartable getKafkaServer() {
* @return The proper connect string to use for Kafka.
*/
public String getKafkaConnectString() {
return localHostname + ":" + kafkaPort;
return getConfiguredHostname() + ":" + kafkaPort;
}

/**
* @return The proper connect string to use for Zookeeper.
*/
public String getZookeeperConnectString() {
return localHostname + ":" + getZookeeperServer().getPort();
return getConfiguredHostname() + ":" + getZookeeperServer().getPort();
}

/**
Expand All @@ -129,46 +152,56 @@ public void start() throws Exception {
zkServer = new TestingServer(zkInstanceSpec, true);
final String zkConnectionString = getZookeeperServer().getConnectString();

// Create temp path to store logs
final File logDir = Files.createTempDir();
logDir.deleteOnExit();
// Build properties using a baseline from overrideBrokerProperties.
final Properties brokerProperties = new Properties();
brokerProperties.putAll(overrideBrokerProperties);

// Determine what port to run kafka on
kafkaPort = String.valueOf(InstanceSpec.getRandomPort());
// Put required zookeeper connection properties.
setPropertyIfNotSet(brokerProperties, "zookeeper.connect", zkConnectionString);

// Build properties
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("zookeeper.connect", zkConnectionString);
kafkaProperties.setProperty("port", kafkaPort);
kafkaProperties.setProperty("log.dir", logDir.getAbsolutePath());
kafkaProperties.setProperty("auto.create.topics.enable", "true");
kafkaProperties.setProperty("zookeeper.session.timeout.ms", "30000");
kafkaProperties.setProperty("broker.id", "1");
kafkaProperties.setProperty("auto.offset.reset", "latest");
// Conditionally generate a port for kafka to use if not already defined.
kafkaPort = (String) setPropertyIfNotSet(brokerProperties, "port", String.valueOf(InstanceSpec.getRandomPort()));

// If log.dir is not set.
if (brokerProperties.getProperty("log.dir") == null) {
// Create temp path to store logs
final File logDir = Files.createTempDir();
logDir.deleteOnExit();

// Set property.
brokerProperties.setProperty("log.dir", logDir.getAbsolutePath());
}

// Ensure that we're advertising appropriately
kafkaProperties.setProperty("host.name", localHostname);
kafkaProperties.setProperty("advertised.host.name", localHostname);
kafkaProperties.setProperty("advertised.port", kafkaPort);
kafkaProperties.setProperty("advertised.listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort);
kafkaProperties.setProperty("listeners", "PLAINTEXT://" + localHostname + ":" + kafkaPort);
setPropertyIfNotSet(brokerProperties, "host.name", getConfiguredHostname());
setPropertyIfNotSet(brokerProperties, "advertised.host.name", getConfiguredHostname());
setPropertyIfNotSet(brokerProperties, "advertised.port", kafkaPort);
setPropertyIfNotSet(brokerProperties, "advertised.listeners", "PLAINTEXT://" + getConfiguredHostname() + ":" + kafkaPort);
setPropertyIfNotSet(brokerProperties, "listeners", "PLAINTEXT://" + getConfiguredHostname() + ":" + kafkaPort);

// Set other defaults if not defined.
setPropertyIfNotSet(brokerProperties, "auto.create.topics.enable", "true");
setPropertyIfNotSet(brokerProperties, "zookeeper.session.timeout.ms", "30000");
setPropertyIfNotSet(brokerProperties, "broker.id", "1");
setPropertyIfNotSet(brokerProperties, "auto.offset.reset", "latest");

// Lower active threads.
kafkaProperties.setProperty("num.io.threads", "2");
kafkaProperties.setProperty("num.network.threads", "2");
kafkaProperties.setProperty("log.flush.interval.messages", "1");
setPropertyIfNotSet(brokerProperties, "num.io.threads", "2");
setPropertyIfNotSet(brokerProperties, "num.network.threads", "2");
setPropertyIfNotSet(brokerProperties, "log.flush.interval.messages", "1");

// Define replication factor for internal topics to 1
kafkaProperties.setProperty("offsets.topic.replication.factor", "1");
kafkaProperties.setProperty("offset.storage.replication.factor", "1");
kafkaProperties.setProperty("transaction.state.log.replication.factor", "1");
kafkaProperties.setProperty("transaction.state.log.min.isr", "1");
kafkaProperties.setProperty("transaction.state.log.num.partitions", "4");
kafkaProperties.setProperty("config.storage.replication.factor", "1");
kafkaProperties.setProperty("status.storage.replication.factor", "1");
kafkaProperties.setProperty("default.replication.factor", "1");

final KafkaConfig config = new KafkaConfig(kafkaProperties);
setPropertyIfNotSet(brokerProperties, "offsets.topic.replication.factor", "1");
setPropertyIfNotSet(brokerProperties, "offset.storage.replication.factor", "1");
setPropertyIfNotSet(brokerProperties, "transaction.state.log.replication.factor", "1");
setPropertyIfNotSet(brokerProperties, "transaction.state.log.min.isr", "1");
setPropertyIfNotSet(brokerProperties, "transaction.state.log.num.partitions", "4");
setPropertyIfNotSet(brokerProperties, "config.storage.replication.factor", "1");
setPropertyIfNotSet(brokerProperties, "status.storage.replication.factor", "1");
setPropertyIfNotSet(brokerProperties, "default.replication.factor", "1");

// Create and start kafka service.
final KafkaConfig config = new KafkaConfig(brokerProperties);
kafka = new KafkaServerStartable(config);
getKafkaServer().startup();
}
Expand All @@ -191,7 +224,7 @@ public void createTopic(final String topicName, final int partitions) {
final short replicationFactor = 1;

// Create admin client
try (final AdminClient adminClient = KafkaAdminClient.create(buildDefaultClientConfig())) {
try (final AdminClient adminClient = getAdminClient()) {
try {
// Define topic
final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
Expand Down Expand Up @@ -219,6 +252,14 @@ public void shutdown() throws Exception {
close();
}

/**
* Creates a Kafka AdminClient connected to our test server.
* @return Kafka AdminClient instance.
*/
public AdminClient getAdminClient() {
return KafkaAdminClient.create(buildDefaultClientConfig());
}

/**
* Creates a kafka producer that is connected to our test server.
* @param <K> Type of message key
Expand Down Expand Up @@ -318,12 +359,49 @@ public <K, V> KafkaConsumer<K, V> getKafkaConsumer(
* Internal helper method to build a default configuration.
*/
private Map<String, Object> buildDefaultClientConfig() {
Map<String, Object> defaultClientConfig = Maps.newHashMap();
final Map<String, Object> defaultClientConfig = new HashMap<>();
defaultClientConfig.put("bootstrap.servers", getKafkaConnectString());
defaultClientConfig.put("client.id", "test-consumer-id");
return defaultClientConfig;
}

/**
* Helper method to conditionally set a property if no value is already set.
* @param properties The properties instance to update.
* @param key The key to set if not already set.
* @param defaultValue The value to set if no value is already set for key.
* @return The value set.
*/
private Object setPropertyIfNotSet(final Properties properties, final String key, final String defaultValue) {
// Validate inputs
if (properties == null) {
throw new NullPointerException("properties argument cannot be null.");
}
if (key == null) {
throw new NullPointerException("key argument cannot be null.");
}

// Conditionally set the property if its not already set.
properties.setProperty(
key,
properties.getProperty(key, defaultValue)
);

// Return the value that is being used.
return properties.get(key);
}

/**
* Returns which hostname/IP address Kafka will bind/listen/advertise with. To change this value
* use the constructor: KafkaTestServer(final Properties overrideBrokerProperties) and set the property
* 'host.name' to the appropriate value.
*
* @return Which hostname/IP Kafka should bind/listen/advertise using.
*/
private String getConfiguredHostname() {
return overrideBrokerProperties.getProperty("host.name", DEFAULT_HOSTNAME);
}

/**
* Closes the internal servers. Failing to call this at the end of your tests will likely
* result in leaking instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
package com.salesforce.kafka.test;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -76,18 +75,18 @@ public List<ProducedKafkaRecord<byte[], byte[]>> produceRecords(
final int partitionId
) {
// This holds the records we produced
List<ProducerRecord<byte[], byte[]>> producedRecords = Lists.newArrayList();
final List<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();

// This holds futures returned
List<Future<RecordMetadata>> producerFutures = Lists.newArrayList();
final List<Future<RecordMetadata>> producerFutures = new ArrayList<>();

KafkaProducer<byte[], byte[]> producer = kafkaTestServer.getKafkaProducer(
final KafkaProducer<byte[], byte[]> producer = kafkaTestServer.getKafkaProducer(
ByteArraySerializer.class,
ByteArraySerializer.class
);
for (Map.Entry<byte[], byte[]> entry: keysAndValues.entrySet()) {
for (final Map.Entry<byte[], byte[]> entry: keysAndValues.entrySet()) {
// Construct filter
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, partitionId, entry.getKey(), entry.getValue());
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, partitionId, entry.getKey(), entry.getValue());
producedRecords.add(record);

// Send it.
Expand All @@ -100,17 +99,16 @@ public List<ProducedKafkaRecord<byte[], byte[]>> produceRecords(
producer.close();

// Loop thru the futures, and build KafkaRecord objects
List<ProducedKafkaRecord<byte[], byte[]>> kafkaRecords = Lists.newArrayList();
final List<ProducedKafkaRecord<byte[], byte[]>> kafkaRecords = new ArrayList<>();
try {
for (int x = 0; x < keysAndValues.size(); x++) {
final RecordMetadata metadata = producerFutures.get(x).get();
final ProducerRecord<byte[], byte[]> producerRecord = producedRecords.get(x);

kafkaRecords.add(ProducedKafkaRecord.newInstance(metadata, producerRecord));
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (final InterruptedException | ExecutionException exception) {
throw new RuntimeException(exception);
}

return kafkaRecords;
Expand Down
Loading