Skip to content

Commit

Permalink
merge: #1211
Browse files Browse the repository at this point in the history
1211: fix: building the logstream is synchronous r=remcowesterhoud a=lenaschoenburg

Mirrors the changes in Zeebe here: camunda/camunda#20339

Co-authored-by: Lena Schönburg <lena@fairburg.xyz>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg authored Jul 23, 2024
2 parents 8d88e13 + b0e13c3 commit 20c160c
Showing 1 changed file with 7 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBuilder;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.engine.db.InMemoryDbFactory;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.clock.ActorClock;
Expand All @@ -33,7 +31,6 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class EngineFactory {
Expand Down Expand Up @@ -70,7 +67,7 @@ private static ZeebeTestEngine create(final int port, final Consumer<Intent> req
final InMemoryLogStorage logStorage = new InMemoryLogStorage();
final LogStream logStream = createLogStream(logStorage, scheduler, partitionId);

final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter().join());
final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter());
final CommandSender commandSender = new CommandSender(commandWriter);
final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore();
final InMemoryJobStreamer jobStreamer = new InMemoryJobStreamer(commandWriter);
Expand All @@ -97,7 +94,7 @@ private static ZeebeTestEngine create(final int port, final Consumer<Intent> req
final EngineStateMonitor engineStateMonitor =
new EngineStateMonitor(logStorage, streamProcessor);

final LogStreamReader reader = logStream.newLogStreamReader().join();
final LogStreamReader reader = logStream.newLogStreamReader();
final RecordStreamSourceImpl recordStream = new RecordStreamSourceImpl(reader, partitionId);

return new InMemoryEngine(
Expand Down Expand Up @@ -125,29 +122,11 @@ private static ActorScheduler createAndStartActorScheduler(final ActorClock cloc

private static LogStream createLogStream(
final LogStorage logStorage, final ActorSchedulingService scheduler, final int partitionId) {
final LogStreamBuilder builder =
LogStream.builder()
.withPartitionId(partitionId)
.withLogStorage(logStorage)
.withActorSchedulingService(scheduler);

final CompletableFuture<LogStream> theFuture = new CompletableFuture<>();

scheduler.submitActor(
Actor.wrap(
(control) ->
builder
.buildAsync()
.onComplete(
(logStream, failure) -> {
if (failure != null) {
theFuture.completeExceptionally(failure);
} else {
theFuture.complete(logStream);
}
})));

return theFuture.join();
return LogStream.builder()
.withPartitionId(partitionId)
.withLogStorage(logStorage)
.withActorSchedulingService(scheduler)
.build();
}

private static ZeebeDb<ZbColumnFamilies> createDatabase() {
Expand Down

0 comments on commit 20c160c

Please sign in to comment.