Skip to content

Commit

Permalink
merge: #10584
Browse files Browse the repository at this point in the history
10584: [Backport release-8.1.0]: Remove actor#submit from ScheduleService r=Zelldon a=Zelldon

## Description


Backports #10554 to the release branch, since I think it makes sense to have in the release as well. Afterwards I have to recreate the benchmarks and rerun the e2e tests.
<!-- Please explain the changes you made here. -->

I had to do cherry-pick manually since there were conflicts with the ProcessingScheduleServiceIntegrationTest (we delete them in this PR). No other commit was harmed. 

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10291



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and ChrisKujawa authored Sep 30, 2022
2 parents c7ff8cd + 1de79b6 commit 76a9ce9
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,10 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.logstreams.log.LogStream;

public interface ReadonlyStreamProcessorContext {

ProcessingScheduleService getScheduleService();

/**
* @return the logstream, on which the processor runs
*/
@Deprecated // only used in EngineRule; TODO remove this
LogStream getLogStream();

/**
* Returns the partition ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void useActorControl(final Runnable task) {
LOG.debug("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
return;
}
actorControl.submit(task);
task.run();
}

public ActorFuture<Void> open(final ActorControl control) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,15 @@ public ProcessingScheduleService getScheduleService() {
return processingScheduleService;
}

@Override
public LogStream getLogStream() {
return logStream;
}

@Override
public int getPartitionId() {
return getLogStream().getPartitionId();
}

public LogStream getLogStream() {
return logStream;
}

public MutableLastProcessedPositionState getLastProcessedPositionState() {
return lastProcessedPositionState;
}
Expand Down
55 changes: 37 additions & 18 deletions engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
Expand Down Expand Up @@ -69,6 +69,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -195,7 +197,9 @@ private void startProcessors() {
partitionId, interPartitionCommandSender),
jobsAvailableCallback,
featureFlags)
.withListener(new ProcessingExporterTransistor())
.withListener(
new ProcessingExporterTransistor(
environmentRule.getLogStream(partitionId)))
.withListener(reprocessingCompletedListener),
Optional.of(
new StreamProcessorListener() {
Expand Down Expand Up @@ -423,28 +427,43 @@ private static class ProcessingExporterTransistor implements StreamProcessorLife

private LogStreamReader logStreamReader;
private TypedRecordImpl typedEvent;
private final SynchronousLogStream synchronousLogStream;
private final ExecutorService executorService;

public ProcessingExporterTransistor(final SynchronousLogStream synchronousLogStream) {
this.synchronousLogStream = synchronousLogStream;
executorService = Executors.newSingleThreadExecutor();
}

@Override
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var scheduleService = context.getScheduleService();

final LogStream logStream = context.getLogStream();
logStream.registerRecordAvailableListener(
() -> scheduleService.runDelayed(Duration.ZERO, this::onNewEventCommitted));
logStream
.newLogStreamReader()
.onComplete(
((reader, throwable) -> {
if (throwable == null) {
logStreamReader = reader;
onNewEventCommitted();
}
}));
executorService.submit(
() -> {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var asyncLogStream = synchronousLogStream.getAsyncLogStream();
asyncLogStream.registerRecordAvailableListener(this::onNewEventCommitted);
logStreamReader = asyncLogStream.newLogStreamReader().join();
exportEvents();
});
}

@Override
public void onClose() {
executorService.shutdownNow();
}

private void onNewEventCommitted() {
// this is called from outside (LogStream), so we need to enqueue the task
if (executorService.isShutdown()) {
return;
}

executorService.submit(this::exportEvents);
}

private void exportEvents() {
// we need to skip until onRecovered happened
if (logStreamReader == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -56,6 +57,10 @@ public LogStreamRecordWriter getLogStreamRecordWriter(final int partitionId) {
return streams.getLogStreamRecordWriter(logName);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streams.getLogStream(getLogName(partitionId));
}

public LogStreamRecordWriter newLogStreamRecordWriter(final int partitionId) {
final String logName = getLogName(partitionId);
return streams.newLogStreamRecordWriter(logName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public StreamProcessor getStreamProcessor(final int partitionId) {
return streamProcessingComposite.getStreamProcessor(partitionId);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streamProcessingComposite.getLogStream(partitionId);
}

public CommandResponseWriter getCommandResponseWriter() {
return streams.getMockedResponseWriter();
}
Expand Down
Loading

0 comments on commit 76a9ce9

Please sign in to comment.