Skip to content

Commit

Permalink
Merge pull request #248 from camunda-cloud/244-refactor-worker-test
Browse files Browse the repository at this point in the history
chore: refactor worker test
  • Loading branch information
pihme authored Mar 10, 2022
2 parents 316f8d7 + 6453fb1 commit d7e2150
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package io.camunda.zeebe.process.test.qa.regular.multithread;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.ProcessInstanceResult;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import io.camunda.zeebe.process.test.extension.ZeebeProcessTest;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.qa.util.Utilities;
import io.camunda.zeebe.process.test.qa.util.Utilities.ProcessPackLoopingServiceTask;
import java.util.Collections;
Expand All @@ -32,6 +34,7 @@ public class WorkerTest {

private ZeebeClient client;
private ZeebeTestEngine engine;
private RecordStream recordStream;

@Test
void testJobsCanBeProcessedAsynchronouslyByWorker()
Expand All @@ -40,27 +43,21 @@ void testJobsCanBeProcessedAsynchronouslyByWorker()
client
.newWorker()
.jobType(ProcessPackLoopingServiceTask.JOB_TYPE)
.handler(
(client, job) -> {
client.newCompleteCommand(job.getKey()).send();
})
.handler((client, job) -> client.newCompleteCommand(job.getKey()).send())
.open();

Utilities.deployProcess(client, ProcessPackLoopingServiceTask.RESOURCE_NAME);
final Map<String, Object> variables =
Collections.singletonMap(ProcessPackLoopingServiceTask.TOTAL_LOOPS, 3);

// when
final ProcessInstanceEvent instanceEvent =
Utilities.startProcessInstance(
final ProcessInstanceResult instanceEvent =
Utilities.startProcessInstanceWithResult(
engine, client, ProcessPackLoopingServiceTask.PROCESS_ID, variables);

// then
BpmnAssert.assertThat(instanceEvent).isStarted();
// TODO: Idle state monitor does not work in this case.
// Might be fixed when switching to the zeebe built-in idle state monitor
Thread.sleep(1000);
BpmnAssert.assertThat(instanceEvent)
assertThat(instanceEvent).isStarted();
assertThat(instanceEvent)
.hasPassedElement(ProcessPackLoopingServiceTask.ELEMENT_ID, 3)
.isCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package io.camunda.zeebe.process.test.qa.testcontainer.multithread;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.ProcessInstanceResult;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import io.camunda.zeebe.process.test.extension.testcontainer.ZeebeProcessTest;
Expand Down Expand Up @@ -51,15 +53,12 @@ void testJobsCanBeProcessedAsynchronouslyByWorker()
Collections.singletonMap(ProcessPackLoopingServiceTask.TOTAL_LOOPS, 3);

// when
final ProcessInstanceEvent instanceEvent =
Utilities.startProcessInstance(
final ProcessInstanceResult instanceEvent =
Utilities.startProcessInstanceWithResult(
engine, client, ProcessPackLoopingServiceTask.PROCESS_ID, variables);

// then
BpmnAssert.assertThat(instanceEvent).isStarted();
// TODO: Idle state monitor does not work in this case.
// Might be fixed when switching to the zeebe built-in idle state monitor
Thread.sleep(1000);
assertThat(instanceEvent).isStarted();
BpmnAssert.assertThat(instanceEvent)
.hasPassedElement(ProcessPackLoopingServiceTask.ELEMENT_ID, 3)
.isCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.ProcessInstanceResult;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
Expand Down Expand Up @@ -80,6 +81,26 @@ public static ProcessInstanceEvent startProcessInstance(
return instanceEvent;
}

public static ProcessInstanceResult startProcessInstanceWithResult(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String processId,
final Map<String, Object> variables)
throws InterruptedException, TimeoutException {
final ProcessInstanceResult instanceResult =
client
.newCreateInstanceCommand()
.bpmnProcessId(processId)
.latestVersion()
.variables(variables)
.withResult()
.send()
.join();

waitForIdleState(engine, Duration.ofSeconds(1));
return instanceResult;
}

public static ActivateJobsResponse activateSingleJob(
final ZeebeClient client, final String jobType) {
return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join();
Expand Down

0 comments on commit d7e2150

Please sign in to comment.