Skip to content

Commit

Permalink
docs: explain the code
Browse files Browse the repository at this point in the history
  • Loading branch information
pihme committed Jun 14, 2022
1 parent c5c6581 commit a5cd432
Showing 1 changed file with 100 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.camunda.zeebe.process.test.examples;

import static java.util.Collections.singletonMap;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
Expand All @@ -29,7 +31,6 @@
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand All @@ -38,6 +39,16 @@
import org.junit.jupiter.api.Test;

@ZeebeProcessTest
/*
* This annotation {@code import io.camunda.zeebe.process.test.extension.ZeebeProcessTest;}
* is recommended for Java 17+. It uses an embedded engine and is the fastest way
* to run the tests.
*
* For Java 8+ use {@code import io.camunda.zeebe.process.test.extension.testcontainer.ZeebeProcessTest;)
* It will start the embedded engine in a Docker container.
*
* Both implementations are interchangeable
*/
public class PullRequestProcessTest {

private static final String PULL_REQUEST_PROCESS_RESOURCE_NAME = "pr-created.bpmn";
Expand All @@ -54,14 +65,19 @@ public class PullRequestProcessTest {
private static final String MERGE_CODE = "mergeCode";
private static final String DEPLOY_SNAPSHOT = "deploySnapshot";

// injected by ZeebeProcessTest annotation
private ZeebeTestEngine engine;
// injected by ZeebeProcessTest annotation
private ZeebeClient client;

@BeforeEach
void deployProcesses() {
// The embedded engine is completely reset before each test run.

// Therefore, we need to deploy the process each time
final DeploymentEvent deploymentEvent =
deployResources(
client, PULL_REQUEST_PROCESS_RESOURCE_NAME, AUTOMATED_TESTS_PROCESS_RESOURCE_NAME);
deployResources(PULL_REQUEST_PROCESS_RESOURCE_NAME, AUTOMATED_TESTS_PROCESS_RESOURCE_NAME);

BpmnAssert.assertThat(deploymentEvent)
.containsProcessesByResourceName(
PULL_REQUEST_PROCESS_RESOURCE_NAME, AUTOMATED_TESTS_PROCESS_RESOURCE_NAME);
Expand All @@ -70,21 +86,29 @@ void deployProcesses() {
@Test
void testPullRequestCreatedHappyPath() throws InterruptedException, TimeoutException {
// Given
final String prId = "123";
final PublishMessageResponse prCreatedResponse =
sendMessage(engine, client, PR_CREATED_MSG, "", Collections.singletonMap(PR_ID_VAR, prId));
final String pullRequestId = "123";

// When

// -> send message to create process instance
final PublishMessageResponse prCreatedResponse =
sendMessage(PR_CREATED_MSG, "", singletonMap(PR_ID_VAR, pullRequestId));

// -> complete user task; user tasks and service tasks can be tested in similar ways
completeTask(REQUEST_REVIEW);
sendMessage(
engine,
client,
REVIEW_RECEIVED_MSG,
prId,
Collections.singletonMap(REVIEW_RESULT_VAR, "approved"));

// -> send another message to drive the process forward
sendMessage(REVIEW_RECEIVED_MSG, pullRequestId, singletonMap(REVIEW_RESULT_VAR, "approved"));

/* -> on a parallel branch of the process, a sub process is called, which spawns three service
* tasks as part of a multi instance embedded sub process. These lines complete the called
* service tasks
*/
completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);

// -> back on the main process, there are two more tasks to complete to reach the end
completeTask(MERGE_CODE);
completeTask(DEPLOY_SNAPSHOT);

Expand All @@ -105,22 +129,23 @@ void testPullRequestCreatedHappyPath() throws InterruptedException, TimeoutExcep
void testRemindReviewer() throws InterruptedException, TimeoutException {
// Given
final String prId = "123";
final PublishMessageResponse prCreatedResponse =
sendMessage(engine, client, PR_CREATED_MSG, "", Collections.singletonMap(PR_ID_VAR, prId));

// When
final PublishMessageResponse prCreatedResponse =
sendMessage(PR_CREATED_MSG, "", singletonMap(PR_ID_VAR, prId));
completeTask(REQUEST_REVIEW);

completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);
increaseTime(engine, Duration.ofDays(1));

// This is how you can manipulate the time of the engine to trigger timer events
increaseTime(Duration.ofDays(1));

completeTask(REMIND_REVIEWER);
sendMessage(
engine,
client,
REVIEW_RECEIVED_MSG,
prId,
Collections.singletonMap(REVIEW_RESULT_VAR, "approved"));

sendMessage(REVIEW_RECEIVED_MSG, prId, singletonMap(REVIEW_RESULT_VAR, "approved"));

completeTask(MERGE_CODE);
completeTask(DEPLOY_SNAPSHOT);

Expand All @@ -137,28 +162,25 @@ void testRemindReviewer() throws InterruptedException, TimeoutException {
void testRejectReview() throws InterruptedException, TimeoutException {
// Given
final String prId = "123";
final PublishMessageResponse prCreatedResponse =
sendMessage(engine, client, PR_CREATED_MSG, "", Collections.singletonMap(PR_ID_VAR, prId));

// When
final PublishMessageResponse prCreatedResponse =
sendMessage(PR_CREATED_MSG, "", singletonMap(PR_ID_VAR, prId));

completeTask(REQUEST_REVIEW);

completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);
completeTask(AUTOMATED_TESTS_RUN_TESTS);
sendMessage(
engine,
client,
REVIEW_RECEIVED_MSG,
prId,
Collections.singletonMap(REVIEW_RESULT_VAR, "rejected"));

sendMessage(REVIEW_RECEIVED_MSG, prId, singletonMap(REVIEW_RESULT_VAR, "rejected"));

completeTask(MAKE_CHANGES);

completeTask(REQUEST_REVIEW);
sendMessage(
engine,
client,
REVIEW_RECEIVED_MSG,
prId,
Collections.singletonMap(REVIEW_RESULT_VAR, "approved"));

sendMessage(REVIEW_RECEIVED_MSG, prId, singletonMap(REVIEW_RESULT_VAR, "approved"));

completeTask(MERGE_CODE);
completeTask(DEPLOY_SNAPSHOT);

Expand All @@ -172,12 +194,7 @@ void testRejectReview() throws InterruptedException, TimeoutException {
.isCompleted();
}

private void completeTask(final String taskId) throws InterruptedException, TimeoutException {
completeTask(engine, client, taskId);
}

private static DeploymentEvent deployResources(
final ZeebeClient client, final String... resources) {
private DeploymentEvent deployResources(final String... resources) {
final DeployResourceCommandStep1 commandStep1 = client.newDeployResourceCommand();

DeployResourceCommandStep1.DeployResourceCommandStep2 commandStep2 = null;
Expand All @@ -192,64 +209,74 @@ private static DeploymentEvent deployResources(
return commandStep2.send().join();
}

private static void waitForIdleState(final ZeebeTestEngine engine, final Duration duration)
/* These two methods deal with the asynchronous nature of the engine. It is recommended
* to wait for an idle state before you assert on the state of the engine. Otherwise, you
* may run into race conditions and flaky tests, depending on whether the engine
* is still busy processing your last commands.
*
* Also note that many of the helper functions used in this test (e.g. {@code sendMessage(..)}
* have a call to this method at the end. This is to ensure that each command sent to the engine
* is fully processed before moving on. Without that you can run into issues, where e.g. you want
* to complete a task, but the task has not been activated yet.
*
* Note that the duration is not like a {@code Thread.sleep()}. The tests will continue as soon as
* an idle state is reached. Only if no idle state is reached during the {@code duration}
* passed in as argument, then a timeout exception will be thrown.
*/
private void waitForIdleState(final Duration duration)
throws InterruptedException, TimeoutException {
engine.waitForIdleState(duration);
}

private static void waitForBusyState(final ZeebeTestEngine engine, final Duration duration)
private void waitForBusyState(final Duration duration)
throws InterruptedException, TimeoutException {
engine.waitForBusyState(duration);
}

private static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey,
final Map<String, Object> variables)
throws InterruptedException, TimeoutException {
return sendMessage(
engine, client, messageName, correlationKey, Duration.ofMinutes(1), variables);
}

private static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey,
final Duration timeToLive,
final Map<String, Object> variables)
private PublishMessageResponse sendMessage(
final String messageName, final String correlationKey, final Map<String, Object> variables)
throws InterruptedException, TimeoutException {
final PublishMessageResponse response =
client
.newPublishMessageCommand()
.messageName(messageName)
.correlationKey(correlationKey)
.timeToLive(timeToLive)
.variables(variables)
.send()
.join();
waitForIdleState(engine, Duration.ofSeconds(1));
waitForIdleState(Duration.ofSeconds(1));
return response;
}

private static void increaseTime(final ZeebeTestEngine engine, final Duration duration)
throws InterruptedException {
private void increaseTime(final Duration duration) throws InterruptedException, TimeoutException {
// this method increases the time in a deterministic manner

/* Process all existing commands to make sure that timer subscriptions related to the process
* so far have been created
*/
waitForIdleState(Duration.ofSeconds(1));

/* Increase time in the engine. This will not take immediate effect, though. There is a
* real-time delay of a couple of ms until the updated time is picked up by the scheduler
*/
engine.increaseTime(duration);

try {
waitForIdleState(engine, Duration.ofSeconds(1));
engine.increaseTime(duration);
waitForBusyState(engine, Duration.ofSeconds(1));
waitForIdleState(engine, Duration.ofSeconds(1));
/* This code assumes that the increase of time will trigger timer events. Therefore, we wait
* until the engine is busy. This means that it started triggering events.
*
* And after that, we wait for it to become idle again. That means it is waiting for new commands
*/
waitForBusyState(Duration.ofSeconds(1));
waitForIdleState(Duration.ofSeconds(1));
} catch (final TimeoutException e) {
// Do nothing. We've waited up to 1 second for processing to start, if it didn't start in this
// time the engine probably has not got anything left to process.
}
}

private static void completeTask(
final ZeebeTestEngine engine, final ZeebeClient client, final String taskId)
throws InterruptedException, TimeoutException {
private void completeTask(final String taskId) throws InterruptedException, TimeoutException {
// TODO rewrite this in a more user-understandable way
final List<Record<JobRecordValue>> records =
StreamFilter.jobRecords(RecordStream.of(engine.getRecordStreamSource()))
.withElementId(taskId)
Expand All @@ -272,6 +299,6 @@ private static void completeTask(
String.format("Tried to complete task `%s`, but it was not found", taskId));
}

waitForIdleState(engine, Duration.ofSeconds(1));
waitForIdleState(Duration.ofSeconds(1));
}
}

0 comments on commit a5cd432

Please sign in to comment.