Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar: add flag to enable transactions and set configuration #5479

Merged
46 changes: 46 additions & 0 deletions docs/modules/pulsar.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
# Apache Pulsar Module

Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services.

It's based on the official Apache Pulsar docker image, it is recommended to read the [official guide](https://pulsar.apache.org/docs/next/getting-started-docker/).

## Example

Create a `PulsarContainer` to use it in your tests:

<!--codeinclude-->
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

Then you can retrieve the broker and the admin url:

<!--codeinclude-->
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
<!--/codeinclude-->

## Options

### Configuration
If you need to set Pulsar configuration variables you can use the native APIs and set each variable with `PULSAR_PREFIX_` as prefix.

For example, if you want to enable `brokerDeduplicationEnabled`:

<!--codeinclude-->
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv
<!--/codeinclude-->

### Pulsar IO

If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:

<!--codeinclude-->
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
<!--/codeinclude-->

### Pulsar Transactions

If you need to test Pulsar Transactions you can enable the transactions feature:

<!--codeinclude-->
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
<!--/codeinclude-->


## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
4 changes: 2 additions & 2 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description = "Testcontainers :: Pulsar"
dependencies {
api project(':testcontainers')

testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.10.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.23.1'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.10.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* This container wraps Apache Pulsar running in standalone mode
*/
Expand All @@ -15,13 +19,21 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final String METRICS_ENDPOINT = "/metrics";

/**
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
*/
private static final String TRANSACTION_TOPIC_ENDPOINT =
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");

@Deprecated
private static final String DEFAULT_TAG = "2.2.0";
private static final String DEFAULT_TAG = "2.10.0";

private boolean functionsWorkerEnabled = false;

private boolean transactionsEnabled = false;

/**
* @deprecated use {@link PulsarContainer(DockerImageName)} instead
*/
Expand All @@ -41,36 +53,55 @@ public PulsarContainer(String pulsarVersion) {
public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));

withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss");
waitingFor(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}

@Override
protected void configure() {
super.configure();

if (functionsWorkerEnabled) {
withCommand("/pulsar/bin/pulsar", "standalone");
waitingFor(
new WaitAllStrategy()
.withStrategy(waitStrategy)
.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1))
);
}
setupCommandAndEnv();
}

public PulsarContainer withFunctionsWorker() {
functionsWorkerEnabled = true;
return this;
}

public PulsarContainer withTransactions() {
transactionsEnabled = true;
return this;
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}

public String getHttpServiceUrl() {
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
}

protected void setupCommandAndEnv() {
String standaloneBaseCommand =
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";

if (!functionsWorkerEnabled) {
standaloneBaseCommand += " --no-functions-worker -nss";
}

withCommand("/bin/bash", "-c", standaloneBaseCommand);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}
if (functionsWorkerEnabled) {
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

Expand All @@ -19,36 +22,103 @@ public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.2.0");
private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.10.0");

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
try (
// do not use PULSAR_IMAGE to make the doc looks easier
// constructorWithVersion {
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.10.0"));
// }
) {
pulsar.start();
// coordinates {
final String pulsarBrokerUrl = pulsar.getPulsarBrokerUrl();
final String httpServiceUrl = pulsar.getHttpServiceUrl();
// }
testPulsarFunctionality(pulsarBrokerUrl);
}
}

@Test
public void envVarsUsage() throws Exception {
try (
// constructorWithEnv {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
// }
) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1")) {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();

assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
}
}
}

@Test
public void shouldWaitForFunctionsWorkerStarted() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1").withFunctionsWorker()) {
try (
// constructorWithFunctionsWorker {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withFunctionsWorker();
// }
) {
pulsar.start();

try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
}
}

@Test
public void testTransactions() throws Exception {
try (
// constructorWithTransactions {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions();
// }
) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void testTransactionsAndFunctionsWorker() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions().withFunctionsWorker()) {
pulsar.start();

assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

Expand All @@ -65,4 +135,27 @@ protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception
assertThat(new String(message.getData())).isEqualTo("test containers");
}
}

protected void testTransactionFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).enableTransaction(true).build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("transaction-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test-transaction-sub")
.subscribe();
Producer<String> producer = client
.newProducer(Schema.STRING)
.sendTimeout(0, TimeUnit.SECONDS)
.topic("transaction-topic")
.create()
) {
final Transaction transaction = client.newTransaction().build().get();
producer.newMessage(transaction).value("first").send();
transaction.commit();
Message<String> message = consumer.receive();
assertThat(message.getValue()).isEqualTo("first");
}
}
}