Skip to content

Commit

Permalink
Merge pull request #76 from camunda-cloud/24-implement-test-engine
Browse files Browse the repository at this point in the history
Implement test engine
  • Loading branch information
remcowesterhoud authored Nov 30, 2021
2 parents 2f8a851 + bb32ec1 commit 6e89e5c
Show file tree
Hide file tree
Showing 36 changed files with 4,195 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

<properties>
<dependency.assertj.version>3.21.0</dependency.assertj.version>
<dependency.awaitility.version>4.1.1</dependency.awaitility.version>
<dependency.errorprone.version>2.10.0</dependency.errorprone.version>
<dependency.eze.version>0.6.0</dependency.eze.version>
<dependency.guava.version>31.0.1-jre</dependency.guava.version>
Expand Down Expand Up @@ -151,6 +152,13 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${dependency.awaitility.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.camunda.zeebe.bpmnassert.filters;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class JobRecordStreamFilter {

private final Stream<Record<JobRecordValue>> stream;

public JobRecordStreamFilter(final Iterable<Record<JobRecordValue>> records) {
stream = StreamSupport.stream(records.spliterator(), false);
}

public JobRecordStreamFilter(final Stream<Record<JobRecordValue>> stream) {
this.stream = stream;
}

public JobRecordStreamFilter withKey(final long key) {
return new JobRecordStreamFilter(stream.filter(record -> record.getKey() == key));
}

public JobRecordStreamFilter withIntent(final JobIntent intent) {
return new JobRecordStreamFilter(stream.filter(record -> record.getIntent() == intent));
}

public Stream<Record<JobRecordValue>> stream() {
return stream;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.camunda.zeebe.bpmnassert.filters;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
Expand Down Expand Up @@ -73,6 +74,11 @@ public ProcessInstanceRecordStreamFilter withBpmnProcessId(final String bpmnProc
stream.filter(record -> record.getValue().getBpmnProcessId().equals(bpmnProcessId)));
}

public ProcessInstanceRecordStreamFilter withRecordType(final RecordType recordType) {
return new ProcessInstanceRecordStreamFilter(
stream.filter(record -> record.getRecordType() == recordType));
}

public Stream<Record<ProcessInstanceRecordValue>> stream() {
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public static ProcessEventRecordStreamFilter processEventRecords(
final RecordStreamSource recordStreamSource) {
return new ProcessEventRecordStreamFilter(recordStreamSource.records());
}

public static JobRecordStreamFilter jobRecords(final RecordStreamSource recordStreamSource) {
return new JobRecordStreamFilter(recordStreamSource.jobRecords());
}

public static TimerRecordStreamFilter timerRecords(final RecordStreamSource recordStreamSource) {
return new TimerRecordStreamFilter(recordStreamSource.timerRecords());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.camunda.zeebe.bpmnassert.filters;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.protocol.record.value.TimerRecordValue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class TimerRecordStreamFilter {

private final Stream<Record<TimerRecordValue>> stream;

public TimerRecordStreamFilter(final Iterable<Record<TimerRecordValue>> records) {
stream = StreamSupport.stream(records.spliterator(), false);
}

public TimerRecordStreamFilter(final Stream<Record<TimerRecordValue>> stream) {
this.stream = stream;
}

public TimerRecordStreamFilter withIntent(final TimerIntent intent) {
return new TimerRecordStreamFilter(stream.filter(record -> record.getIntent() == intent));
}

public Stream<Record<TimerRecordValue>> stream() {
return stream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.camunda.zeebe.bpmnassert.testengine;

import io.camunda.zeebe.bpmnassert.testengine.db.InMemoryDbFactory;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.logstreams.log.*;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.util.concurrent.CompletableFuture;

public class EngineFactory {

public InMemoryEngine create() {
final int partitionId = 1;
final int partitionCount = 1;
final int port = SocketUtil.getNextAddress().getPort();

final ControlledActorClock clock = createActorClock();
final ActorScheduler scheduler = createAndStartActorScheduler(clock);

final InMemoryLogStorage logStorage = new InMemoryLogStorage();
final LogStream logStream = createLogStream(logStorage, scheduler, partitionId);
final LogStreamRecordWriter streamWriter = logStream.newLogStreamRecordWriter().join();

final SubscriptionCommandSenderFactory subscriptionCommandSenderFactory =
new SubscriptionCommandSenderFactory(streamWriter, partitionId);

final GrpcToLogStreamGateway gateway =
new GrpcToLogStreamGateway(streamWriter, partitionId, partitionCount, port);
final Server grpcServer = ServerBuilder.forPort(port).addService(gateway).build();
final GrpcResponseWriter grpcResponseWriter = new GrpcResponseWriter(gateway);

final ZeebeDb<ZbColumnFamilies> zeebeDb = createDatabase();

final IdleStateMonitor idleStateMonitor =
new IdleStateMonitor(logStorage, logStream.newLogStreamReader().join());

final StreamProcessor streamProcessor =
createStreamProcessor(
logStream,
zeebeDb,
scheduler,
grpcResponseWriter,
idleStateMonitor,
partitionCount,
subscriptionCommandSenderFactory);

final LogStreamReader reader = logStream.newLogStreamReader().join();
final RecordStreamSourceImpl recordStream = new RecordStreamSourceImpl(reader, partitionId);

return new InMemoryEngineImpl(
grpcServer,
streamProcessor,
gateway,
zeebeDb,
logStream,
scheduler,
recordStream,
clock,
idleStateMonitor);
}

private ControlledActorClock createActorClock() {
return new ControlledActorClock();
}

private ActorScheduler createAndStartActorScheduler(final ActorClock clock) {
final ActorScheduler scheduler =
ActorScheduler.newActorScheduler().setActorClock(clock).build();
scheduler.start();
return scheduler;
}

private LogStream createLogStream(
final LogStorage logStorage, final ActorSchedulingService scheduler, final int partitionId) {
final LogStreamBuilder builder =
LogStream.builder()
.withPartitionId(partitionId)
.withLogStorage(logStorage)
.withActorSchedulingService(scheduler);

final CompletableFuture<LogStream> theFuture = new CompletableFuture<>();

scheduler.submitActor(
Actor.wrap(
(control) ->
builder
.buildAsync()
.onComplete(
(logStream, failure) -> {
if (failure != null) {
theFuture.completeExceptionally(failure);
} else {
theFuture.complete(logStream);
}
})));

return theFuture.join();
}

private ZeebeDb<ZbColumnFamilies> createDatabase() {
final InMemoryDbFactory<ZbColumnFamilies> factory = new InMemoryDbFactory<>();
return factory.createDb();
}

private StreamProcessor createStreamProcessor(
final LogStream logStream,
final ZeebeDb<ZbColumnFamilies> database,
final ActorSchedulingService scheduler,
final GrpcResponseWriter grpcResponseWriter,
final IdleStateMonitor idleStateMonitor,
final int partitionCount,
final SubscriptionCommandSenderFactory subscriptionCommandSenderFactory) {
return StreamProcessor.builder()
.logStream(logStream)
.zeebeDb(database)
.eventApplierFactory(EventAppliers::new)
.commandResponseWriter(grpcResponseWriter)
.streamProcessorFactory(
context ->
EngineProcessors.createEngineProcessors(
context.listener(idleStateMonitor),
partitionCount,
subscriptionCommandSenderFactory.createSender(),
new SinglePartitionDeploymentDistributor(),
new SinglePartitionDeploymentResponder(),
jobType -> {}))
.actorSchedulingService(scheduler)
.build();
}
}
Loading

0 comments on commit 6e89e5c

Please sign in to comment.