Skip to content

Commit

Permalink
Add Kafka module (#546)
Browse files Browse the repository at this point in the history
* Add Kafka module

* Replace AmbassadorContainer with SocatContainer in DockerComposeContainer

* make it possible to use KafkaContainer with external Zookeeper

* fix typo

* fix Kafka tests

* Add to CHANGELOG.md, name SocatContainer, add listeners explanation comment to KafkaContainer

* listen on alias

* rename myNetworkAlias -> networkAlias
  • Loading branch information
bsideup authored Jan 27, 2018
1 parent 35a6dde commit 8d5f7a9
Show file tree
Hide file tree
Showing 12 changed files with 343 additions and 48 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
### Fixed
- Fixed retrieval of Docker host IP when running inside Docker. ([\#479](https://github.com/testcontainers/testcontainers-java/issues/479))

### Changed
- Added Kafka module ([\#546](https://github.com/testcontainers/testcontainers-java/pull/546))

## [1.5.1] - 2017-12-19

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
* An ambassador container is used as a TCP proxy, enabling any TCP port of another linked container to be exposed
* publicly, even if that container does not make the port public itself. The <code>richnorth/ambassador:latest</code>
* container is used (based on HAProxy).
*
* @deprecated use {@link SocatContainer}
*/
@EqualsAndHashCode(callSuper = false)
@Data
@Deprecated
public class AmbassadorContainer<SELF extends AmbassadorContainer<SELF>> extends GenericContainer<SELF> {

private final String otherContainerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.junit.runner.Description;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.profiler.Profiler;
Expand All @@ -27,8 +24,15 @@
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

import java.io.File;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -46,7 +50,6 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
* Random identifier which will become part of spawned containers names, so we can shut them down
*/
private final String identifier;
private final Map<String, AmbassadorContainer> ambassadorContainers = new HashMap<>();
private final List<File> composeFiles;
private final Set<String> spawnedContainerIds = new HashSet<>();
private final Set<String> spawnedNetworkIds = new HashSet<>();
Expand All @@ -56,6 +59,10 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
private boolean pull = true;
private boolean tailChildContainers;

private final AtomicInteger nextAmbassadorPort = new AtomicInteger(2000);
private final Map<String, Map<Integer, Integer>> ambassadorPortMappings = new ConcurrentHashMap<>();
private final SocatContainer ambassadorContainer = new SocatContainer();

private static final Object MUTEX = new Object();

/**
Expand All @@ -64,12 +71,6 @@ public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> e
*/
private Map<String, String> env = new HashMap<>();

private static final RateLimiter AMBASSADOR_CREATION_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(6, TimeUnit.MINUTES)
.withConstantThroughput()
.build();

@Deprecated
public DockerComposeContainer(File composeFile, String identifier) {
this(identifier, composeFile);
Expand Down Expand Up @@ -201,31 +202,9 @@ private List<Container> listChildContainers() {
}

private void startAmbassadorContainers(Profiler profiler) {
for (final Map.Entry<String, AmbassadorContainer> address : ambassadorContainers.entrySet()) {

try {
// Start any ambassador containers we need
profiler.start("Ambassador container startup");

final AmbassadorContainer ambassadorContainer = address.getValue();
Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> {

AMBASSADOR_CREATION_RATE_LIMITER.doWhenReady(() -> {
Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName());

localProfiler.start("Start ambassador container");

ambassadorContainer.start();
});

return null;
});
} catch (Exception e) {
logger().warn("Exception during ambassador container startup!", e);
} finally {
profiler.stop().log();
}
}
profiler.start("Ambassador container startup");
ambassadorContainer.start();
profiler.stop().log();
}

private Logger logger() {
Expand All @@ -237,8 +216,8 @@ public void finished(Description description) {


synchronized (MUTEX) {
// shut down all the ambassador containers
ambassadorContainers.forEach((String address, AmbassadorContainer container) -> container.stop());
// shut down the ambassador container
ambassadorContainer.stop();

// Kill the services using docker-compose
try {
Expand Down Expand Up @@ -270,7 +249,7 @@ public SELF withExposedService(String serviceName, int servicePort) {
}

/*
* For every service/port pair that needs to be exposed, we have to start an 'ambassador container'.
* For every service/port pair that needs to be exposed, we register a target on an 'ambassador container'.
*
* The ambassador container's role is to link (within the Docker network) to one of the
* compose services, and proxy TCP network I/O out to a port that the ambassador container
Expand All @@ -282,13 +261,12 @@ public SELF withExposedService(String serviceName, int servicePort) {
* {@link GenericContainer} should ensure that the ambassador container is on the same network
* as the rest of the compose environment.
*/
AmbassadorContainer ambassadorContainer =
new AmbassadorContainer<>(new FutureContainer(this.identifier + "_" + serviceName), serviceName, servicePort)
.withEnv(env);

// Ambassador containers will all be started together after docker compose has started
ambassadorContainers.put(serviceName + ":" + servicePort, ambassadorContainer);

// Ambassador container will be started together after docker compose has started
int ambassadorPort = nextAmbassadorPort.getAndIncrement();
ambassadorPortMappings.computeIfAbsent(serviceName, __ -> new ConcurrentHashMap<>()).put(servicePort, ambassadorPort);
ambassadorContainer.withTarget(ambassadorPort, serviceName, servicePort);
ambassadorContainer.addLink(new FutureContainer(this.identifier + "_" + serviceName), serviceName);
return self();
}

Expand All @@ -307,7 +285,7 @@ public DockerComposeContainer withExposedService(String serviceName, int instanc
* @return a host IP address or hostname that can be used for accessing the service container.
*/
public String getServiceHost(String serviceName, Integer servicePort) {
return ambassadorContainers.get(serviceName + ":" + servicePort).getContainerIpAddress();
return ambassadorContainer.getContainerIpAddress();
}

/**
Expand All @@ -321,7 +299,7 @@ public String getServiceHost(String serviceName, Integer servicePort) {
* @return a port that can be used for accessing the service container.
*/
public Integer getServicePort(String serviceName, Integer servicePort) {
return ambassadorContainers.get(serviceName + ":" + servicePort).getMappedPort(servicePort);
return ambassadorContainer.getMappedPort(ambassadorPortMappings.get(serviceName).get(servicePort));
}

public SELF withScaledService(String serviceBaseName, int numInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NetworkImpl extends ExternalResource implements Network {
private final AtomicBoolean initialized = new AtomicBoolean();

@Override
public String getId() {
public synchronized String getId() {
if (initialized.compareAndSet(false, true)) {
id = create();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.testcontainers.containers;

import org.testcontainers.utility.Base58;
import org.testcontainers.utility.TestcontainersConfiguration;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* A socat container is used as a TCP proxy, enabling any TCP port of another container to be exposed
* publicly, even if that container does not make the port public itself.
*/
public class SocatContainer extends GenericContainer<SocatContainer> {

private final Map<Integer, String> targets = new HashMap<>();

public SocatContainer() {
super(TestcontainersConfiguration.getInstance().getSocatContainerImage());
withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh"));
withCreateContainerCmdModifier(it -> it.withName("testcontainers-socat-" + Base58.randomString(8)));
}

public SocatContainer withTarget(int exposedPort, String host) {
return withTarget(exposedPort, host, exposedPort);
}

public SocatContainer withTarget(int exposedPort, String host, int internalPort) {
addExposedPort(exposedPort);
targets.put(exposedPort, String.format("%s:%s", host, internalPort));
return self();
}

@Override
protected void configure() {
withCommand("-c",
targets.entrySet().stream()
.map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue())
.collect(Collectors.joining(" & "))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() {
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest");
}

public String getSocatContainerImage() {
return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest");
}

public String getVncRecordedContainerImage() {
return (String) properties.getOrDefault("vncrecorder.container.image", "richnorth/vnc-recorder:latest");
}
Expand Down
45 changes: 45 additions & 0 deletions modules/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-parent</artifactId>
<version>0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>kafka</artifactId>
<name>TestContainers :: Apache Kafka</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testcontainers</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.testcontainers.containers;

import org.testcontainers.utility.Base58;

import java.util.stream.Stream;

public class KafkaContainer extends GenericContainer<KafkaContainer> {

public static final int KAFKA_PORT = 9092;

public static final int ZOOKEEPER_PORT = 2181;

protected String externalZookeeperConnect = null;

protected SocatContainer proxy;

public KafkaContainer() {
this("4.0.0");
}

public KafkaContainer(String confluentPlatformVersion) {
super("confluentinc/cp-kafka:" + confluentPlatformVersion);

withNetwork(Network.newNetwork());
String networkAlias = "kafka-" + Base58.randomString(6);
withNetworkAliases(networkAlias);
withExposedPorts(KAFKA_PORT);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + networkAlias + ":9093");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
}

public KafkaContainer withEmbeddedZookeeper() {
externalZookeeperConnect = null;
return self();
}

public KafkaContainer withExternalZookeeper(String connectString) {
externalZookeeperConnect = connectString;
return self();
}

public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", proxy.getContainerIpAddress(), proxy.getFirstMappedPort());
}

@Override
public void start() {
String networkAlias = getNetworkAliases().get(0);
proxy = new SocatContainer()
.withNetwork(getNetwork())
.withTarget(9092, networkAlias)
.withTarget(2181, networkAlias);

proxy.start();
withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + networkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort());

if (externalZookeeperConnect != null) {
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
} else {
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY);
withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run");
}

super.start();
}

@Override
public void stop() {
Stream.<Runnable>of(super::stop, proxy::stop).parallel().forEach(Runnable::run);
}
}
3 changes: 3 additions & 0 deletions modules/kafka/src/main/resources/tc-zookeeper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
clientPort=2181
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
Loading

0 comments on commit 8d5f7a9

Please sign in to comment.