Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable IdleStateMonitor #87

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static StreamProcessor createStreamProcessor(
.streamProcessorFactory(
context ->
EngineProcessors.createEngineProcessors(
context,
context.listener(idleStateMonitor),
partitionCount,
subscriptionCommandSenderFactory.createSender(),
new SinglePartitionDeploymentDistributor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
import io.camunda.zeebe.logstreams.storage.LogStorage;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

final class IdleStateMonitor implements LogStorage.CommitListener, StreamProcessorListener {

private static final Timer TIMER = new Timer();
private final List<Runnable> callbacks = new ArrayList<>();
private final LogStreamReader reader;
private long lastEventPosition = -1L;
private long lastProcessedPosition = -1;
private volatile long lastEventPosition = -1L;
private volatile long lastProcessedPosition = -1L;

private volatile TimerTask task =
remcowesterhoud marked this conversation as resolved.
Show resolved Hide resolved
createTask(); // must never be null for the synchronization to work

IdleStateMonitor(final InMemoryLogStorage logStorage, final LogStreamReader logStreamReader) {
logStorage.addCommitListener(this);
Expand All @@ -30,9 +36,18 @@ public void addCallback(final Runnable callback) {

private void checkIdleState() {
if (isInIdleState()) {
remcowesterhoud marked this conversation as resolved.
Show resolved Hide resolved
synchronized (callbacks) {
callbacks.forEach(Runnable::run);
callbacks.clear();
synchronized (task) {
remcowesterhoud marked this conversation as resolved.
Show resolved Hide resolved
task = createTask();
try {
TIMER.schedule(task, 10);
} catch (IllegalStateException e) {
// thrown - among others - if task was cancelled before it could be scheduled
// do nothing in this case
}
}
} else {
synchronized (task) {
remcowesterhoud marked this conversation as resolved.
Show resolved Hide resolved
task.cancel();
}
}
}
Expand All @@ -43,8 +58,10 @@ private boolean isInIdleState() {
}

private void forwardToLastEvent() {
while (reader.hasNext()) {
lastEventPosition = reader.next().getPosition();
synchronized (reader) {
while (reader.hasNext()) {
lastEventPosition = reader.next().getPosition();
}
}
}

Expand All @@ -55,19 +72,33 @@ public void onCommit() {

@Override
public void onProcessed(final TypedRecord<?> processedCommand) {
lastProcessedPosition = processedCommand.getPosition();
lastProcessedPosition = Math.max(lastProcessedPosition, processedCommand.getPosition());
checkIdleState();
}

@Override
public void onSkipped(final LoggedEvent skippedRecord) {
lastProcessedPosition = skippedRecord.getPosition();
lastProcessedPosition = Math.max(lastProcessedPosition, skippedRecord.getPosition());
checkIdleState();
}

@Override
public void onReplayed(final long lastReplayedEventPosition, final long lastReadRecordPosition) {
lastProcessedPosition = lastReplayedEventPosition;
lastProcessedPosition = Math.max(lastProcessedPosition, lastReplayedEventPosition);
checkIdleState();
}

private TimerTask createTask() {
return new TimerTask() {
@Override
public void run() {
if (isInIdleState()) {
synchronized (callbacks) {
callbacks.forEach(Runnable::run);
callbacks.clear();
}
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.grpc.Server;
import java.io.IOException;
import java.time.Duration;
import org.apache.commons.lang3.NotImplementedException;
import java.util.concurrent.CompletableFuture;

public class InMemoryEngineImpl implements InMemoryEngine {

Expand Down Expand Up @@ -95,17 +95,15 @@ public void increaseTime(final Duration timeToAdd) {

@Override
public void runOnIdleState(final Runnable callback) {
throw new NotImplementedException("This feature is coming soon");
// idleStateMonitor.addCallback(callback);
idleStateMonitor.addCallback(callback);
}

@Override
public void waitForIdleState() {
throw new NotImplementedException("This feature is coming soon");
// final CompletableFuture<Void> idleState = new CompletableFuture<>();
//
// runOnIdleState(() -> idleState.complete(null));
//
// idleState.join();
final CompletableFuture<Void> idleState = new CompletableFuture<>();

runOnIdleState(() -> idleState.complete(null));

idleState.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.bpmnassert.util.Utilities;
import io.camunda.zeebe.bpmnassert.util.Utilities.ProcessPackLoopingServiceTask;
import io.camunda.zeebe.bpmnassert.util.Utilities.ProcessPackMultipleTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.testengine.InMemoryEngine;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.testengine.InMemoryEngine;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.bpmnassert.util.Utilities.ProcessPackLoopingServiceTask;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.testengine.InMemoryEngine;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.inspections.model.InspectedProcessInstance;
import io.camunda.zeebe.bpmnassert.testengine.InMemoryEngine;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.bpmnassert.util.Utilities.ProcessPackTimerStartEvent;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.camunda.zeebe.bpmnassert.extensions.ZeebeProcessTest;
import io.camunda.zeebe.bpmnassert.inspections.model.InspectedProcessInstance;
import io.camunda.zeebe.bpmnassert.testengine.InMemoryEngine;
import io.camunda.zeebe.bpmnassert.testengine.RecordStreamSource;
import io.camunda.zeebe.bpmnassert.util.Utilities.ProcessPackCallActivity;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/io/camunda/zeebe/bpmnassert/util/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,8 @@ public static ActivateJobsResponse activateSingleJob(
return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join();
}

// TODO find a better solution for this
public static void waitForIdleState(final InMemoryEngine engine) {
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
e.printStackTrace();
throw new IllegalStateException("Sleep was interrupted");
}
engine.waitForIdleState();
}

public static PublishMessageResponse sendMessage(
Expand Down Expand Up @@ -151,6 +145,12 @@ public static PublishMessageResponse sendMessage(

public static void increaseTime(final InMemoryEngine engine, final Duration duration) {
engine.increaseTime(duration);
try {
// we need to wait some physical time so that InMemoryEngine has a chance to fire the timers
Thread.sleep(50);
remcowesterhoud marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
// do nothing
}
waitForIdleState(engine);
}

Expand Down