Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka module #546

Merged
merged 8 commits into from
Jan 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
### Fixed
- Fixed retrieval of Docker host IP when running inside Docker. ([\#479](https://github.com/testcontainers/testcontainers-java/issues/479))

### Changed
- Added Kafka module ([\#546](https://github.com/testcontainers/testcontainers-java/pull/546))

## [1.5.1] - 2017-12-19

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
* An ambassador container is used as a TCP proxy, enabling any TCP port of another linked container to be exposed
* publicly, even if that container does not make the port public itself. The <code>richnorth/ambassador:latest</code>
* container is used (based on HAProxy).
*
* @deprecated use {@link SocatContainer}
*/
@EqualsAndHashCode(callSuper = false)
@Data
@Deprecated
public class AmbassadorContainer<SELF extends AmbassadorContainer<SELF>> extends GenericContainer<SELF> {

private final String otherContainerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.junit.runner.Description;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.profiler.Profiler;
Expand All @@ -27,8 +24,15 @@
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

import java.io.File;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -46,7 +50,6 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
* Random identifier which will become part of spawned containers names, so we can shut them down
*/
private final String identifier;
private final Map<String, AmbassadorContainer> ambassadorContainers = new HashMap<>();
private final List<File> composeFiles;
private final Set<String> spawnedContainerIds = new HashSet<>();
private final Set<String> spawnedNetworkIds = new HashSet<>();
Expand All @@ -56,6 +59,10 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
private boolean pull = true;
private boolean tailChildContainers;

private final AtomicInteger nextAmbassadorPort = new AtomicInteger(2000);
private final Map<String, Map<Integer, Integer>> ambassadorPortMappings = new ConcurrentHashMap<>();
private final SocatContainer ambassadorContainer = new SocatContainer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this changes happen in a different PR? It's a quite big one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is little to no activity around DockerComposeContainer, so I would keep it as part of this PR until you have objections :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try this change on Windows tonight 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good on Docker for Windows 💪


private static final Object MUTEX = new Object();

/**
Expand All @@ -64,12 +71,6 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
*/
private Map<String, String> env = new HashMap<>();

private static final RateLimiter AMBASSADOR_CREATION_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(6, TimeUnit.MINUTES)
.withConstantThroughput()
.build();

@Deprecated
public DockerComposeContainer(File composeFile, String identifier) {
this(identifier, composeFile);
Expand Down Expand Up @@ -201,31 +202,9 @@ private List<Container> listChildContainers() {
}

private void startAmbassadorContainers(Profiler profiler) {
for (final Map.Entry<String, AmbassadorContainer> address : ambassadorContainers.entrySet()) {

try {
// Start any ambassador containers we need
profiler.start("Ambassador container startup");

final AmbassadorContainer ambassadorContainer = address.getValue();
Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> {

AMBASSADOR_CREATION_RATE_LIMITER.doWhenReady(() -> {
Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName());

localProfiler.start("Start ambassador container");

ambassadorContainer.start();
});

return null;
});
} catch (Exception e) {
logger().warn("Exception during ambassador container startup!", e);
} finally {
profiler.stop().log();
}
}
profiler.start("Ambassador container startup");
ambassadorContainer.start();
profiler.stop().log();
}

private Logger logger() {
Expand All @@ -237,8 +216,8 @@ public void finished(Description description) {


synchronized (MUTEX) {
// shut down all the ambassador containers
ambassadorContainers.forEach((String address, AmbassadorContainer container) -> container.stop());
// shut down the ambassador container
ambassadorContainer.stop();

// Kill the services using docker-compose
try {
Expand Down Expand Up @@ -270,7 +249,7 @@ public SELF withExposedService(String serviceName, int servicePort) {
}

/*
* For every service/port pair that needs to be exposed, we have to start an 'ambassador container'.
* For every service/port pair that needs to be exposed, we register a target on an 'ambassador container'.
*
* The ambassador container's role is to link (within the Docker network) to one of the
* compose services, and proxy TCP network I/O out to a port that the ambassador container
Expand All @@ -282,13 +261,12 @@ public SELF withExposedService(String serviceName, int servicePort) {
* {@link GenericContainer} should ensure that the ambassador container is on the same network
* as the rest of the compose environment.
*/
AmbassadorContainer ambassadorContainer =
new AmbassadorContainer<>(new FutureContainer(this.identifier + "_" + serviceName), serviceName, servicePort)
.withEnv(env);

// Ambassador containers will all be started together after docker compose has started
ambassadorContainers.put(serviceName + ":" + servicePort, ambassadorContainer);

// Ambassador container will be started together after docker compose has started
int ambassadorPort = nextAmbassadorPort.getAndIncrement();
ambassadorPortMappings.computeIfAbsent(serviceName, __ -> new ConcurrentHashMap<>()).put(servicePort, ambassadorPort);
ambassadorContainer.withTarget(ambassadorPort, serviceName, servicePort);
ambassadorContainer.addLink(new FutureContainer(this.identifier + "_" + serviceName), serviceName);
return self();
}

Expand All @@ -307,7 +285,7 @@ public DockerComposeContainer withExposedService(String serviceName, int instanc
* @return a host IP address or hostname that can be used for accessing the service container.
*/
public String getServiceHost(String serviceName, Integer servicePort) {
return ambassadorContainers.get(serviceName + ":" + servicePort).getContainerIpAddress();
return ambassadorContainer.getContainerIpAddress();
}

/**
Expand All @@ -321,7 +299,7 @@ public String getServiceHost(String serviceName, Integer servicePort) {
* @return a port that can be used for accessing the service container.
*/
public Integer getServicePort(String serviceName, Integer servicePort) {
return ambassadorContainers.get(serviceName + ":" + servicePort).getMappedPort(servicePort);
return ambassadorContainer.getMappedPort(ambassadorPortMappings.get(serviceName).get(servicePort));
}

public SELF withScaledService(String serviceBaseName, int numInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NetworkImpl extends ExternalResource implements Network {
private final AtomicBoolean initialized = new AtomicBoolean();

@Override
public String getId() {
public synchronized String getId() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nasty bug discovered during my experiments with Kafka module, concurrent access was giving null for the second invocation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making id volatile. This should work and is cheaper under contention that synchronized method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a performance critical part (being called just a few times), doesn't make sense to over-optimize it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Train your good habits! :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it simple 😎

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't volatile as simple? Doesn't really matter for me in this case 😁

if (initialized.compareAndSet(false, true)) {
id = create();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.testcontainers.containers;

import org.testcontainers.utility.Base58;
import org.testcontainers.utility.TestcontainersConfiguration;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* A socat container is used as a TCP proxy, enabling any TCP port of another container to be exposed
* publicly, even if that container does not make the port public itself.
*/
public class SocatContainer extends GenericContainer<SocatContainer> {

private final Map<Integer, String> targets = new HashMap<>();

public SocatContainer() {
super(TestcontainersConfiguration.getInstance().getSocatContainerImage());
withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought - please could we set the container's name to something that people will be able to identify? (i.e. stop people wondering "what's this random socat container?"). A name like "testcontainers-socat-" + base58chars would probably work.

I realise that the image richnorth/ambassador or the random container names it was spawned with were never great in the first place, but we have a chance to do better 😄

withCreateContainerCmdModifier(it -> it.withName("testcontainers-socat-" + Base58.randomString(8)));
}

public SocatContainer withTarget(int exposedPort, String host) {
return withTarget(exposedPort, host, exposedPort);
}

public SocatContainer withTarget(int exposedPort, String host, int internalPort) {
addExposedPort(exposedPort);
targets.put(exposedPort, String.format("%s:%s", host, internalPort));
return self();
}

@Override
protected void configure() {
withCommand("-c",
targets.entrySet().stream()
.map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that strange hostname values won't break the shell if we concat the String like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK any valid hostname should be valid shell argument

.collect(Collectors.joining(" & "))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() {
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest");
}

public String getSocatContainerImage() {
return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to use :latest :(
https://hub.docker.com/r/alpine/socat/tags/

Even tho there is a tag, it seems to be old and didn't work for me. But I assume it's fine to use latest here because socat itself is supposed to be backward compatible and shouldn't break over the time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or would it be more responsible to maintain our own image? 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we get our name on Docker Hub - sure, for now I would stick with Alpine's

}

public String getVncRecordedContainerImage() {
return (String) properties.getOrDefault("vncrecorder.container.image", "richnorth/vnc-recorder:latest");
}
Expand Down
45 changes: 45 additions & 0 deletions modules/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-parent</artifactId>
<version>0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>kafka</artifactId>
<name>TestContainers :: Apache Kafka</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testcontainers</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.testcontainers.containers;

import org.testcontainers.utility.Base58;

import java.util.stream.Stream;

public class KafkaContainer extends GenericContainer<KafkaContainer> {

public static final int KAFKA_PORT = 9092;

public static final int ZOOKEEPER_PORT = 2181;

protected String externalZookeeperConnect = null;

protected SocatContainer proxy;

public KafkaContainer() {
this("4.0.0");
}

public KafkaContainer(String confluentPlatformVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something, but don't we need a custom wait strategy? Or is port based wait strategy already enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Kafka is ready to be used when the port is open

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I at least had some instances, where I got the following error when creating a topic:
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 1 larger than available brokers: 0 (kafka.admin.TopicCommand$).

This did not happen for me, when I used
.waitingFor(new LogMessageWaitStrategy().withRegEx(/(?s).*started \(kafka\.server\.KafkaServer\).*/))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Started Kafka" != "Initialized Kafka", your clients should be able to sync the cluster state before doing operations (and most of them do).

Waiting for cluster initialization instead of just startup will just make the tests slower while it should be handled by your consumers / producers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, in this case we were using the bundled helper scripts, like kafka-topics.sh (but this is mostly a rudiment from having to use kafka with GenericContainer).

super("confluentinc/cp-kafka:" + confluentPlatformVersion);

withNetwork(Network.newNetwork());
String networkAlias = "kafka-" + Base58.randomString(6);
withNetworkAliases(networkAlias);
withExposedPorts(KAFKA_PORT);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + networkAlias + ":9093");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
}

public KafkaContainer withEmbeddedZookeeper() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because confluent images contain zookeeper?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

externalZookeeperConnect = null;
return self();
}

public KafkaContainer withExternalZookeeper(String connectString) {
externalZookeeperConnect = connectString;
return self();
}

public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", proxy.getContainerIpAddress(), proxy.getFirstMappedPort());
}

@Override
public void start() {
String networkAlias = getNetworkAliases().get(0);
proxy = new SocatContainer()
.withNetwork(getNetwork())
.withTarget(9092, networkAlias)
.withTarget(2181, networkAlias);

proxy.start();
withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + networkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort());

if (externalZookeeperConnect != null) {
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
} else {
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY);
withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run");
}

super.start();
}

@Override
public void stop() {
Stream.<Runnable>of(super::stop, proxy::stop).parallel().forEach(Runnable::run);
}
}
3 changes: 3 additions & 0 deletions modules/kafka/src/main/resources/tc-zookeeper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
clientPort=2181
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
Loading