Skip to content

Commit

Permalink
Merge pull request #193 from camunda-cloud/separate-recordstream
Browse files Browse the repository at this point in the history
Isolate shared RecordStreamSource functionality
  • Loading branch information
remcowesterhoud authored Feb 22, 2022
2 parents 34a090c + e388110 commit 29ae84a
Show file tree
Hide file tree
Showing 23 changed files with 269 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface InMemoryEngine {
void stop();

/** @return the {@link RecordStreamSource} of this engine */
RecordStreamSource getRecordStream();
RecordStreamSource getRecordStreamSource();

/** @return a newly created {@link ZeebeClient} */
ZeebeClient createClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,9 @@

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.*;
import io.camunda.zeebe.protocol.record.value.deployment.Process;

public interface RecordStreamSource {

/** @return an iterable of all {@link Record} */
Iterable<Record<?>> records();

/** @return an iterable of all {@link Record<ProcessInstanceRecordValue>} */
Iterable<Record<ProcessInstanceRecordValue>> processInstanceRecords();

/** @return an iterable of all {@link Record<JobRecordValue>} */
Iterable<Record<JobRecordValue>> jobRecords();

/** @return an iterable of all {@link Record<JobBatchRecordValue>} */
Iterable<Record<JobBatchRecordValue>> jobBatchRecords();

/** @return an iterable of all {@link Record<DeploymentRecordValue>} */
Iterable<Record<DeploymentRecordValue>> deploymentRecords();

/** @return an iterable of all {@link Record<Process>} */
Iterable<Record<Process>> processRecords();

/** @return an iterable of all {@link Record<VariableRecordValue>} */
Iterable<Record<VariableRecordValue>> variableRecords();

/** @return an iterable of all {@link Record<VariableDocumentRecordValue>} */
Iterable<Record<VariableDocumentRecordValue>> variableDocumentRecords();

/** @return an iterable of all {@link Record<IncidentRecordValue>} */
Iterable<Record<IncidentRecordValue>> incidentRecords();

/** @return an iterable of all {@link Record<TimerRecordValue>} */
Iterable<Record<TimerRecordValue>> timerRecords();

/** @return an iterable of all {@link Record<MessageRecordValue>} */
Iterable<Record<MessageRecordValue>> messageRecords();

/** @return an iterable of all {@link Record<MessageSubscriptionRecordValue>} */
Iterable<Record<MessageSubscriptionRecordValue>> messageSubscriptionRecords();

/** @return an iterable of all {@link Record<MessageStartEventSubscriptionRecordValue>} */
Iterable<Record<MessageStartEventSubscriptionRecordValue>> messageStartEventSubscriptionRecords();

/** @return an iterable of all {@link Record<ProcessMessageSubscriptionRecordValue>} */
Iterable<Record<ProcessMessageSubscriptionRecordValue>> processMessageSubscriptionRecords();

/**
* Prints all records to the console
*
* @param compact enable compact logging
*/
void print(final boolean compact);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.ProcessInstanceResult;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.inspections.model.InspectedProcessInstance;

public abstract class BpmnAssert {

static ThreadLocal<RecordStreamSource> recordStreamSource = new ThreadLocal<>();
static ThreadLocal<RecordStream> recordStream = new ThreadLocal<>();

public static void initRecordStream(final RecordStreamSource recordStreamSource) {
BpmnAssert.recordStreamSource.set(recordStreamSource);
public static void initRecordStream(final RecordStream recordStream) {
BpmnAssert.recordStream.set(recordStream);
}

public static void resetRecordStream() {
recordStreamSource.remove();
recordStream.remove();
}

public static RecordStreamSource getRecordStreamSource() {
if (recordStreamSource.get() == null) {
public static RecordStream getRecordStreamSource() {
if (recordStream.get() == null) {
throw new AssertionError(
"No RecordStreamSource is set. Please make sure you are using the "
+ "@ZeebeProcessTest annotation. Alternatively, set one manually using "
+ "BpmnAssert.initRecordStream.");
}
return recordStreamSource.get();
return recordStream.get();
}

public static ProcessInstanceAssert assertThat(final ProcessInstanceEvent instanceEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@

import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.Process;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.RecordStream;
import java.util.List;
import org.assertj.core.api.AbstractAssert;

/** Assertions for {@code DeploymentEvent} instances */
public class DeploymentAssert extends AbstractAssert<DeploymentAssert, DeploymentEvent> {

private final RecordStreamSource recordStreamSource;
private final RecordStream recordStream;

public DeploymentAssert(
final DeploymentEvent actual, final RecordStreamSource recordStreamSource) {
public DeploymentAssert(final DeploymentEvent actual, final RecordStream recordStream) {
super(actual, DeploymentAssert.class);
this.recordStreamSource = recordStreamSource;
this.recordStream = recordStream;
}

/**
Expand Down Expand Up @@ -79,7 +78,7 @@ public ProcessAssert extractingProcessByBpmnProcessId(final String bpmnProcessId
bpmnProcessId, matchingProcesses.size(), matchingProcesses)
.hasSize(1);

return new ProcessAssert(matchingProcesses.get(0), recordStreamSource);
return new ProcessAssert(matchingProcesses.get(0), recordStream);
}

/**
Expand All @@ -102,6 +101,6 @@ public ProcessAssert extractingProcessByResourceName(final String resourceName)
resourceName, matchingProcesses.size(), matchingProcesses)
.hasSize(1);

return new ProcessAssert(matchingProcesses.get(0), recordStreamSource);
return new ProcessAssert(matchingProcesses.get(0), recordStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.IncidentRecordStreamFilter;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand All @@ -20,11 +20,11 @@
public class IncidentAssert extends AbstractAssert<IncidentAssert, Long> {
private final String LINE_SEPARATOR = System.lineSeparator();

private final RecordStreamSource recordStreamSource;
private final RecordStream recordStream;

public IncidentAssert(final long incidentKey, final RecordStreamSource recordStreamSource) {
public IncidentAssert(final long incidentKey, final RecordStream recordStream) {
super(incidentKey, IncidentAssert.class);
this.recordStreamSource = recordStreamSource;
this.recordStream = recordStream;
}

/**
Expand Down Expand Up @@ -198,7 +198,7 @@ public IncidentAssert isUnresolved() {
}

private IncidentRecordStreamFilter getIncidentRecords(final IncidentIntent intent) {
return StreamFilter.incident(recordStreamSource)
return StreamFilter.incident(recordStream)
.withRejectionType(RejectionType.NULL_VAL)
.withIncidentKey(actual)
.withIntent(intent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.IncidentRecordStreamFilter;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand All @@ -18,11 +18,11 @@
/** Assertions for {@code ActivatedJob} instances */
public class JobAssert extends AbstractAssert<JobAssert, ActivatedJob> {

private final RecordStreamSource recordStreamSource;
private final RecordStream recordStream;

public JobAssert(final ActivatedJob actual, final RecordStreamSource recordStreamSource) {
public JobAssert(final ActivatedJob actual, final RecordStream recordStream) {
super(actual, JobAssert.class);
this.recordStreamSource = recordStreamSource;
this.recordStream = recordStream;
}

/**
Expand Down Expand Up @@ -138,11 +138,11 @@ public IncidentAssert extractLatestIncident() {
final Record<IncidentRecordValue> latestIncidentRecord =
incidentCreatedRecords.get(incidentCreatedRecords.size() - 1);

return new IncidentAssert(latestIncidentRecord.getKey(), recordStreamSource);
return new IncidentAssert(latestIncidentRecord.getKey(), recordStream);
}

private IncidentRecordStreamFilter getIncidentCreatedRecords() {
return StreamFilter.incident(recordStreamSource)
return StreamFilter.incident(recordStream)
.withRejectionType(RejectionType.NULL_VAL)
.withJobKey(actual.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand All @@ -20,12 +20,11 @@

public class MessageAssert extends AbstractAssert<MessageAssert, PublishMessageResponse> {

private RecordStreamSource recordStreamSource;
private RecordStream recordStream;

protected MessageAssert(
final PublishMessageResponse actual, final RecordStreamSource recordStreamSource) {
protected MessageAssert(final PublishMessageResponse actual, final RecordStream recordStream) {
super(actual, MessageAssert.class);
this.recordStreamSource = recordStreamSource;
this.recordStream = recordStream;
}

/**
Expand All @@ -35,7 +34,7 @@ protected MessageAssert(
*/
public MessageAssert hasBeenCorrelated() {
final boolean isCorrelated =
StreamFilter.processMessageSubscription(recordStreamSource)
StreamFilter.processMessageSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(ProcessMessageSubscriptionIntent.CORRELATED)
Expand All @@ -57,7 +56,7 @@ public MessageAssert hasBeenCorrelated() {
*/
public MessageAssert hasNotBeenCorrelated() {
final Optional<Record<ProcessMessageSubscriptionRecordValue>> recordOptional =
StreamFilter.processMessageSubscription(recordStreamSource)
StreamFilter.processMessageSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(ProcessMessageSubscriptionIntent.CORRELATED)
Expand All @@ -84,7 +83,7 @@ public MessageAssert hasNotBeenCorrelated() {
*/
public MessageAssert hasCreatedProcessInstance() {
final boolean isCorrelated =
StreamFilter.messageStartEventSubscription(recordStreamSource)
StreamFilter.messageStartEventSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(MessageStartEventSubscriptionIntent.CORRELATED)
Expand All @@ -108,7 +107,7 @@ public MessageAssert hasCreatedProcessInstance() {
*/
public MessageAssert hasNotCreatedProcessInstance() {
final Optional<Record<MessageStartEventSubscriptionRecordValue>> recordOptional =
StreamFilter.messageStartEventSubscription(recordStreamSource)
StreamFilter.messageStartEventSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(MessageStartEventSubscriptionIntent.CORRELATED)
Expand All @@ -135,7 +134,7 @@ public MessageAssert hasNotCreatedProcessInstance() {
*/
public MessageAssert hasExpired() {
final boolean isExpired =
StreamFilter.message(recordStreamSource)
StreamFilter.message(recordStream)
.withKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(MessageIntent.EXPIRED)
Expand All @@ -157,7 +156,7 @@ public MessageAssert hasExpired() {
*/
public MessageAssert hasNotExpired() {
final boolean isExpired =
StreamFilter.message(recordStreamSource)
StreamFilter.message(recordStream)
.withKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(MessageIntent.EXPIRED)
Expand Down Expand Up @@ -187,7 +186,7 @@ public ProcessInstanceAssert extractingProcessInstance() {
actual.getMessageKey(), correlatedProcessInstances.size(), correlatedProcessInstances)
.hasSize(1);

return new ProcessInstanceAssert(correlatedProcessInstances.get(0), recordStreamSource);
return new ProcessInstanceAssert(correlatedProcessInstances.get(0), recordStream);
}

/**
Expand All @@ -196,7 +195,7 @@ public ProcessInstanceAssert extractingProcessInstance() {
* @return List of process instance keys
*/
private List<Long> getProcessInstanceKeysForCorrelatedMessage() {
return StreamFilter.processMessageSubscription(recordStreamSource)
return StreamFilter.processMessageSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(ProcessMessageSubscriptionIntent.CORRELATED)
Expand All @@ -211,7 +210,7 @@ private List<Long> getProcessInstanceKeysForCorrelatedMessage() {
* @return List of process instance keys
*/
private List<Long> getProcessInstanceKeysForCorrelatedMessageStartEvent() {
return StreamFilter.messageStartEventSubscription(recordStreamSource)
return StreamFilter.messageStartEventSubscription(recordStream)
.withMessageKey(actual.getMessageKey())
.withRejectionType(RejectionType.NULL_VAL)
.withIntent(MessageStartEventSubscriptionIntent.CORRELATED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.client.api.response.Process;
import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down Expand Up @@ -31,11 +31,11 @@
*/
public class ProcessAssert extends AbstractAssert<ProcessAssert, Process> {

private final RecordStreamSource recordStreamSource;
private final RecordStream recordStream;

public ProcessAssert(final Process actual, final RecordStreamSource recordStreamSource) {
public ProcessAssert(final Process actual, final RecordStream recordStream) {
super(actual, ProcessAssert.class);
this.recordStreamSource = recordStreamSource;
this.recordStream = recordStream;
}

/**
Expand Down Expand Up @@ -141,7 +141,7 @@ public ProcessAssert hasInstances(final long expectedNumberOfInstances) {
}

private Stream<Record<ProcessInstanceRecordValue>> getRecords() {
return StreamFilter.processInstance(recordStreamSource)
return StreamFilter.processInstance(recordStream)
.withRejectionType(RejectionType.NULL_VAL)
.withElementId(actual.getBpmnProcessId())
.withBpmnElementType(BpmnElementType.PROCESS)
Expand Down
Loading

0 comments on commit 29ae84a

Please sign in to comment.