diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java index b6e2a750..e52e766b 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java @@ -14,14 +14,12 @@ import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer; import io.camunda.zeebe.logstreams.log.LogStream; -import io.camunda.zeebe.logstreams.log.LogStreamBuilder; import io.camunda.zeebe.logstreams.log.LogStreamReader; import io.camunda.zeebe.logstreams.storage.LogStorage; import io.camunda.zeebe.process.test.api.ZeebeTestEngine; import io.camunda.zeebe.process.test.engine.db.InMemoryDbFactory; import io.camunda.zeebe.protocol.ZbColumnFamilies; import io.camunda.zeebe.protocol.record.intent.Intent; -import io.camunda.zeebe.scheduler.Actor; import io.camunda.zeebe.scheduler.ActorScheduler; import io.camunda.zeebe.scheduler.ActorSchedulingService; import io.camunda.zeebe.scheduler.clock.ActorClock; @@ -33,7 +31,6 @@ import java.io.IOException; import java.net.ServerSocket; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; public class EngineFactory { @@ -70,7 +67,7 @@ private static ZeebeTestEngine create(final int port, final Consumer req final InMemoryLogStorage logStorage = new InMemoryLogStorage(); final LogStream logStream = createLogStream(logStorage, scheduler, partitionId); - final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter().join()); + final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter()); final CommandSender commandSender = new CommandSender(commandWriter); final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore(); final InMemoryJobStreamer jobStreamer = new InMemoryJobStreamer(commandWriter); @@ -97,7 +94,7 @@ private static ZeebeTestEngine create(final int port, final Consumer req final EngineStateMonitor engineStateMonitor = new EngineStateMonitor(logStorage, streamProcessor); - final LogStreamReader reader = logStream.newLogStreamReader().join(); + final LogStreamReader reader = logStream.newLogStreamReader(); final RecordStreamSourceImpl recordStream = new RecordStreamSourceImpl(reader, partitionId); return new InMemoryEngine( @@ -125,29 +122,11 @@ private static ActorScheduler createAndStartActorScheduler(final ActorClock cloc private static LogStream createLogStream( final LogStorage logStorage, final ActorSchedulingService scheduler, final int partitionId) { - final LogStreamBuilder builder = - LogStream.builder() - .withPartitionId(partitionId) - .withLogStorage(logStorage) - .withActorSchedulingService(scheduler); - - final CompletableFuture theFuture = new CompletableFuture<>(); - - scheduler.submitActor( - Actor.wrap( - (control) -> - builder - .buildAsync() - .onComplete( - (logStream, failure) -> { - if (failure != null) { - theFuture.completeExceptionally(failure); - } else { - theFuture.complete(logStream); - } - }))); - - return theFuture.join(); + return LogStream.builder() + .withPartitionId(partitionId) + .withLogStorage(logStorage) + .withActorSchedulingService(scheduler) + .build(); } private static ZeebeDb createDatabase() {