Skip to content

Commit

Permalink
test(engine): turn off inconsistency detection test
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisKujawa authored and npepinpe committed Nov 5, 2020
1 parent 2a629db commit 325c0c9
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public ProcessingContext maxFragmentSize(final int maxFragmentSize) {
return this;
}

public ProcessingContext setDetectReprocessingInconsistency(final boolean detectReprocessingInconsistency) {
public ProcessingContext setDetectReprocessingInconsistency(
final boolean detectReprocessingInconsistency) {
this.detectReprocessingInconsistency = detectReprocessingInconsistency;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ private void reprocessEvent(final LoggedEvent currentEvent) {
verifyRecordMatchesToReprocessing(typedEvent);
}


if (currentEvent.getPosition() <= lastSourceEventPosition) {
// don't reprocess records after the last source event
reprocessingStreamWriter.configureSourceContext(currentEvent.getPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder detectReprocessingInconsistency(final boolean detectReprocessingInconsistency) {
public StreamProcessorBuilder detectReprocessingInconsistency(
final boolean detectReprocessingInconsistency) {
this.processingContext.setDetectReprocessingInconsistency(detectReprocessingInconsistency);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void shouldDetectNoIssues() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
assertThat(
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldDetectDifferentKey() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down Expand Up @@ -152,7 +152,7 @@ public void shouldDetectDifferentIntent() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void shouldDetectMissingRecordOnLogStream() {
.causedBy(2));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.engine.processing.streamprocessor;

import static org.assertj.core.api.Assertions.assertThat;

import io.zeebe.engine.util.EngineRule;
import io.zeebe.engine.util.RecordToWrite;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.record.value.BpmnElementType;
import io.zeebe.protocol.record.value.JobRecordValue;
import io.zeebe.protocol.record.value.WorkflowInstanceRecordValue;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.zeebe.util.health.HealthStatus;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public final class ReprocessingIssueDetectionTurnedOffTest {

@Rule public final EngineRule engine = EngineRule.singlePartition();

@Rule
public final RecordingExporterTestWatcher recordingExporterTestWatcher =
new RecordingExporterTestWatcher();

private long workflowInstanceKey;
private Record<JobRecordValue> jobCreated;
private Record<WorkflowInstanceRecordValue> serviceTaskActivated;
private Record<WorkflowInstanceRecordValue> processActivated;

@Before
public void setup() {
engine
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess("process")
.startEvent()
.serviceTask("task", t -> t.zeebeJobType("test"))
.done())
.deploy();

workflowInstanceKey = engine.workflowInstance().ofBpmnProcessId("process").create();

processActivated =
RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_ACTIVATED)
.withElementType(BpmnElementType.PROCESS)
.getFirst();

jobCreated = RecordingExporter.jobRecords(JobIntent.CREATED).getFirst();

serviceTaskActivated =
RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_ACTIVATED)
.withElementType(BpmnElementType.SERVICE_TASK)
.getFirst();

engine.stop();
}

@Test
public void shouldNotDetectDifferentKey() {
// given
engine.writeRecords(
RecordToWrite.command()
.job(JobIntent.COMPLETE, jobCreated.getValue())
.key(jobCreated.getKey()),
RecordToWrite.event()
.job(JobIntent.COMPLETED, jobCreated.getValue())
.key(jobCreated.getKey())
.causedBy(0),
// expected the key to be serviceTaskActivated.getKey()
RecordToWrite.event()
.workflowInstance(
WorkflowInstanceIntent.ELEMENT_COMPLETING, serviceTaskActivated.getValue())
.key(123L)
.causedBy(1));

// when
engine.start();

// then
engine.awaitReprocessingCompleted();

final var streamProcessor = engine.getStreamProcessor(1);
assertThat(streamProcessor.isFailed()).isFalse();
assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.HEALTHY);
}

@Test
public void shouldNotDetectDifferentIntent() {
// given
engine.writeRecords(
RecordToWrite.command()
.job(JobIntent.COMPLETE, jobCreated.getValue())
.key(jobCreated.getKey()),
RecordToWrite.event()
.job(JobIntent.COMPLETED, jobCreated.getValue())
.key(jobCreated.getKey())
.causedBy(0),
// expected the intent to be ELEMENT_COMPLETING
RecordToWrite.event()
.workflowInstance(
WorkflowInstanceIntent.ELEMENT_TERMINATING, serviceTaskActivated.getValue())
.key(serviceTaskActivated.getKey())
.causedBy(1));

// when
engine.start();

// then
engine.awaitReprocessingCompleted();

final var streamProcessor = engine.getStreamProcessor(1);
assertThat(streamProcessor.isFailed()).isFalse();
assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.HEALTHY);
}

@Test
public void shouldNotDetectMissingRecordOnLogStream() {
// given
engine.writeRecords(
RecordToWrite.command()
.workflowInstance(WorkflowInstanceIntent.CANCEL, processActivated.getValue())
.key(workflowInstanceKey),
RecordToWrite.event()
.workflowInstance(
WorkflowInstanceIntent.ELEMENT_TERMINATING, processActivated.getValue())
.key(workflowInstanceKey)
.causedBy(0),
// expected the follow-up event with intent ELEMENT_TERMINATING for the service task
RecordToWrite.command()
.job(JobIntent.COMPLETE, jobCreated.getValue())
.key(jobCreated.getKey()),
RecordToWrite.event()
.job(JobIntent.COMPLETED, jobCreated.getValue())
.key(jobCreated.getKey())
.causedBy(2));

// when
engine.start();

// then
engine.awaitReprocessingCompleted();

final var streamProcessor = engine.getStreamProcessor(1);
assertThat(streamProcessor.isFailed()).isFalse();
assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.HEALTHY);
}
}
38 changes: 37 additions & 1 deletion engine/src/test/java/io/zeebe/engine/util/EngineRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -72,6 +73,7 @@
public final class EngineRule extends ExternalResource {

private static final int PARTITION_ID = Protocol.DEPLOYMENT_PARTITION;
private static final int REPROCESSING_TIMEOUT_SEC = 30;
private static final RecordingExporter RECORDING_EXPORTER = new RecordingExporter();
private final StreamProcessorRule environmentRule;
private final RecordingExporterTestWatcher recordingExporterTestWatcher =
Expand All @@ -84,6 +86,8 @@ public final class EngineRule extends ExternalResource {
private final Int2ObjectHashMap<SubscriptionCommandMessageHandler> subscriptionHandlers =
new Int2ObjectHashMap<>();
private ExecutorService subscriptionHandlerExecutor;
private final Map<Integer, ReprocessingCompletedListener> partitionReprocessingCompleteListeners =
new Int2ObjectHashMap<>();

private EngineRule(final int partitionCount) {
this(partitionCount, false);
Expand Down Expand Up @@ -129,12 +133,17 @@ protected void before() {
protected void after() {
subscriptionHandlerExecutor.shutdown();
subscriptionHandlers.clear();
partitionReprocessingCompleteListeners.clear();
}

public void start() {
startProcessors();
}

public void startWithReprocessingDetection() {
startProcessors(true);
}

public void stop() {
forEachPartition(environmentRule::closeStreamProcessor);
}
Expand All @@ -150,6 +159,10 @@ public EngineRule withDeploymentDistributor(final DeploymentDistributor deployme
}

private void startProcessors() {
startProcessors(false);
}

private void startProcessors(final boolean detectReprocessingInconsistency) {
final DeploymentRecord deploymentRecord = new DeploymentRecord();
final UnsafeBuffer deploymentBuffer = new UnsafeBuffer(new byte[deploymentRecord.getLength()]);
deploymentRecord.write(deploymentBuffer, 0);
Expand All @@ -160,6 +173,8 @@ private void startProcessors() {

forEachPartition(
partitionId -> {
final var reprocessingCompletedListener = new ReprocessingCompletedListener();
partitionReprocessingCompleteListeners.put(partitionId, reprocessingCompletedListener);
environmentRule.startTypedStreamProcessor(
partitionId,
(processingContext) ->
Expand All @@ -171,7 +186,9 @@ partitionId, new PartitionCommandSenderImpl()),
deploymentDistributor,
(key, partition) -> {},
jobsAvailableCallback)
.withListener(new ProcessingExporterTransistor()));
.withListener(new ProcessingExporterTransistor())
.withListener(reprocessingCompletedListener),
detectReprocessingInconsistency);

// sequenialize the commands to avoid concurrency
subscriptionHandlers.put(
Expand All @@ -181,6 +198,12 @@ partitionId, new PartitionCommandSenderImpl()),
});
}

public void awaitReprocessingCompleted() {
partitionReprocessingCompleteListeners
.values()
.forEach(ReprocessingCompletedListener::awaitReprocessingComplete);
}

public void forEachPartition(final Consumer<Integer> partitionIdConsumer) {
int partitionId = PARTITION_ID;
for (int i = 0; i < partitionCount; i++) {
Expand Down Expand Up @@ -346,6 +369,19 @@ private void onNewEventCommitted() {
}
}

private final class ReprocessingCompletedListener implements StreamProcessorLifecycleAware {
private final ActorFuture<Void> reprocessingComplete = new CompletableActorFuture<>();

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
reprocessingComplete.complete(null);
}

public void awaitReprocessingComplete() {
reprocessingComplete.join(REPROCESSING_TIMEOUT_SEC, TimeUnit.SECONDS);
}
}

private final class DeploymentDistributionImpl implements DeploymentDistributor {

private final Map<Long, PendingDeploymentDistribution> pendingDeployments = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,21 @@ public StreamProcessor startTypedStreamProcessor(
}

public StreamProcessor startTypedStreamProcessor(final TypedRecordProcessorFactory factory) {
return startTypedStreamProcessor(partitionId, factory);
return startTypedStreamProcessor(partitionId, factory, false);
}

public StreamProcessor startTypedStreamProcessor(
final int partitionId, final TypedRecordProcessorFactory factory) {
final int partitionId,
final TypedRecordProcessorFactory factory,
final boolean detectReprocessingInconsistency) {
return streams.startStreamProcessor(
getLogName(partitionId),
zeebeDbFactory,
(processingContext -> {
zeebeState = processingContext.getZeebeState();
return factory.createProcessors(processingContext);
}));
}),
detectReprocessingInconsistency);
}

public void pauseProcessing(final int partitionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ public StreamProcessor startTypedStreamProcessor(
}

public StreamProcessor startTypedStreamProcessor(final TypedRecordProcessorFactory factory) {
return startTypedStreamProcessor(startPartitionId, factory);
return startTypedStreamProcessor(startPartitionId, factory, false);
}

public StreamProcessor startTypedStreamProcessor(
final int partitionId, final TypedRecordProcessorFactory factory) {
return streamProcessingComposite.startTypedStreamProcessor(partitionId, factory);
final int partitionId,
final TypedRecordProcessorFactory factory,
final boolean detectReprocessingInconsistency) {
return streamProcessingComposite.startTypedStreamProcessor(
partitionId, factory, detectReprocessingInconsistency);
}

public void pauseProcessing(final int partitionId) {
Expand Down
Loading

0 comments on commit 325c0c9

Please sign in to comment.