Skip to content

Commit

Permalink
Merge #5765
Browse files Browse the repository at this point in the history
5765: [BACKPORT 0.25] Add experimental flag to control inconsistency detection r=menski a=npepinpe

## Description

This PR backports #5761 and adds a flag to control inconsistency detection during reprocessing.

## Related issues

<!-- Which issues are closed by this PR or are related -->

backports #5761 
related to #5759 

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [x] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors[bot] and ChrisKujawa authored Nov 5, 2020
2 parents 5f8a494 + 89af464 commit 7cc99d4
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ public class ExperimentalCfg {
public static final int DEFAULT_MAX_APPENDS_PER_FOLLOWER = 2;
public static final DataSize DEFAULT_MAX_APPEND_BATCH_SIZE = DataSize.ofKilobytes(32);
public static final boolean DEFAULT_DISABLE_EXPLICIT_RAFT_FLUSH = false;
private static final boolean DEFAULT_DETECT_REPROCESSING_INCONSISTENCY = false;

private int maxAppendsPerFollower = DEFAULT_MAX_APPENDS_PER_FOLLOWER;
private DataSize maxAppendBatchSize = DEFAULT_MAX_APPEND_BATCH_SIZE;
private boolean disableExplicitRaftFlush = DEFAULT_DISABLE_EXPLICIT_RAFT_FLUSH;
private boolean detectReprocessingInconsistency = DEFAULT_DETECT_REPROCESSING_INCONSISTENCY;

public int getMaxAppendsPerFollower() {
return maxAppendsPerFollower;
Expand Down Expand Up @@ -53,6 +55,14 @@ public void setDisableExplicitRaftFlush(final boolean disableExplicitRaftFlush)
this.disableExplicitRaftFlush = disableExplicitRaftFlush;
}

public boolean isDetectReprocessingInconsistency() {
return detectReprocessingInconsistency;
}

public void setDetectReprocessingInconsistency(final boolean detectReprocessingInconsistency) {
this.detectReprocessingInconsistency = detectReprocessingInconsistency;
}

@Override
public String toString() {
return "ExperimentalCfg{"
Expand All @@ -62,6 +72,8 @@ public String toString() {
+ maxAppendBatchSize
+ ", disableExplicitRaftFlush="
+ disableExplicitRaftFlush
+ ", detectReprocessingInconsistency="
+ detectReprocessingInconsistency
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ public String getName() {
}

private StreamProcessor createStreamProcessor(final PartitionContext state) {

return StreamProcessor.builder()
.logStream(state.getLogStream())
.actorScheduler(state.getScheduler())
.zeebeDb(state.getZeebeDb())
.nodeId(state.getNodeId())
.commandResponseWriter(state.getCommandApiService().newCommandResponseWriter())
.detectReprocessingInconsistency(
state.getBrokerCfg().getExperimental().isDetectReprocessingInconsistency())
.onProcessedListener(
state.getCommandApiService().getOnProcessedListener(state.getPartitionId()))
.streamProcessorFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.zeebe.broker.system.configuration.NetworkCfg.DEFAULT_MONITORING_API_PORT;
import static io.zeebe.protocol.Protocol.START_PARTITION_ID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.zeebe.broker.exporter.debug.DebugLogExporter;
Expand Down Expand Up @@ -56,10 +57,12 @@ public final class BrokerCfgTest {
"zeebe.broker.cluster.clusterSize";
private static final String ZEEBE_BROKER_CLUSTER_CLUSTER_NAME =
"zeebe.broker.cluster.clusterName";
private static final String ZEEBE_BROKER_CLUSTER_MAX_APPENDS_PER_FOLLOWER =
private static final String ZEEBE_BROKER_EXPERIMENTAL_MAX_APPENDS_PER_FOLLOWER =
"zeebe.broker.experimental.maxAppendsPerFollower";
private static final String ZEEBE_BROKER_CLUSTER_MAX_APPEND_BATCH_SIZE =
private static final String ZEEBE_BROKER_EXPERIMENTAL_MAX_APPEND_BATCH_SIZE =
"zeebe.broker.experimental.maxAppendBatchSize";
private static final String ZEEBE_BROKER_EXPERIMENTAL_DETECT_REPROCESSING_INCONSISTENCY =
"zeebe.broker.experimental.detectReprocessingInconsistency";
private static final String ZEEBE_BROKER_EXPERIMENTAL_DISABLEEXPLICITRAFTFLUSH =
"zeebe.broker.experimental.disableExplicitRaftFlush";

Expand Down Expand Up @@ -439,7 +442,7 @@ public void shouldOverrideClusterSizeViaEnvironment() {
@Test
public void shouldOverrideMaxAppendsViaEnvironment() {
// given
environment.put(ZEEBE_BROKER_CLUSTER_MAX_APPENDS_PER_FOLLOWER, "8");
environment.put(ZEEBE_BROKER_EXPERIMENTAL_MAX_APPENDS_PER_FOLLOWER, "8");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("cluster-cfg", environment);
Expand All @@ -452,7 +455,7 @@ public void shouldOverrideMaxAppendsViaEnvironment() {
@Test
public void shouldOverrideMaxAppendBatchSizeViaEnvironment() {
// given
environment.put(ZEEBE_BROKER_CLUSTER_MAX_APPEND_BATCH_SIZE, "256KB");
environment.put(ZEEBE_BROKER_EXPERIMENTAL_MAX_APPEND_BATCH_SIZE, "256KB");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("cluster-cfg", environment);
Expand All @@ -462,6 +465,43 @@ public void shouldOverrideMaxAppendBatchSizeViaEnvironment() {
assertThat(experimentalCfg.getMaxAppendBatchSizeInBytes()).isEqualTo(256 * 1024);
}

@Test
public void shouldDisableDetectReprocessingInconsistencyPerDefault() {
// given
final BrokerCfg cfg = TestConfigReader.readConfig("default", environment);
// when

final ExperimentalCfg experimentalCfg = cfg.getExperimental();

// then
assertThat(experimentalCfg.isDisableExplicitRaftFlush()).isFalse();
}

@Test
public void shouldOverrideDetectReprocessingInconsistencySettingViaEnvironment() {
// given
environment.put(ZEEBE_BROKER_EXPERIMENTAL_DETECT_REPROCESSING_INCONSISTENCY, "true");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("cluster-cfg", environment);
final ExperimentalCfg experimentalCfg = cfg.getExperimental();

// then
assertThat(experimentalCfg.isDetectReprocessingInconsistency()).isTrue();
}

@Test
public void
shouldThrowExceptionWhenInvalidValueIsUsedForDetectReprocessingInconsistencySettingViaEnvironment() {
// given
environment.put(ZEEBE_BROKER_EXPERIMENTAL_DETECT_REPROCESSING_INCONSISTENCY, "XXX");

// thrown
assertThatThrownBy(() -> TestConfigReader.readConfig("default", environment))
.hasMessageContaining(
"Failed to bind properties under 'zeebe.broker.experimental.detect-reprocessing-inconsistency' to boolean");
}

@Test
public void shouldOverrideDisableExplicitRaftFlushViaEnvironment() {
// given
Expand Down
5 changes: 5 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,8 @@
# Sets the maximum batch size, which is send per append request to a follower.
# This setting can also be overridden using the environment variable ZEEBE_EXPERIMENTAL_MAX_APPEND_BATCH_SIZE
# maxAppendBatchSize = 32KB;

# Enables the detection of an inconsistency during reprocessing. If a inconsistency is detect the StreamProcessor is
# failed and the partition becomes unhealthy, no further progress will made on that specific partition.
# This setting can also be overridden using the environment variable ZEEBE_EXPERIMENTAL_DETECT_REPROCESSING_INCONSISTENCY
# detectReprocessingInconsistency = false;
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class ProcessingContext implements ReadonlyProcessingContext {
private BooleanSupplier abortCondition;
private Consumer<TypedRecord> onProcessedListener = record -> {};
private int maxFragmentSize;
private boolean detectReprocessingInconsistency;

public ProcessingContext actor(final ActorControl actor) {
this.actor = actor;
Expand Down Expand Up @@ -102,6 +103,12 @@ public ProcessingContext maxFragmentSize(final int maxFragmentSize) {
return this;
}

public ProcessingContext setDetectReprocessingInconsistency(
final boolean detectReprocessingInconsistency) {
this.detectReprocessingInconsistency = detectReprocessingInconsistency;
return this;
}

@Override
public ActorControl getActor() {
return actor;
Expand Down Expand Up @@ -165,4 +172,8 @@ public BooleanSupplier getAbortCondition() {
public Consumer<TypedRecord> getOnProcessedListener() {
return onProcessedListener;
}

public boolean isDetectReprocessingInconsistency() {
return detectReprocessingInconsistency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public final class ReProcessingStateMachine {
private LoggedEvent currentEvent;
private TypedRecordProcessor eventProcessor;
private ZeebeDbTransaction zeebeDbTransaction;
private boolean detectReprocessingInconsistency;

public ReProcessingStateMachine(final ProcessingContext context) {
actor = context.getActor();
Expand All @@ -131,6 +132,7 @@ public ReProcessingStateMachine(final ProcessingContext context) {

updateStateRetryStrategy = new EndlessRetryStrategy(actor);
processRetryStrategy = new EndlessRetryStrategy(actor);
detectReprocessingInconsistency = context.isDetectReprocessingInconsistency();
}

/**
Expand Down Expand Up @@ -267,7 +269,9 @@ private void reprocessEvent(final LoggedEvent currentEvent) {
recordValues.readRecordValue(currentEvent, metadata.getValueType());
typedEvent.wrap(currentEvent, metadata, value);

verifyRecordMatchesToReprocessing(typedEvent);
if (detectReprocessingInconsistency) {
verifyRecordMatchesToReprocessing(typedEvent);
}

if (currentEvent.getPosition() <= lastSourceEventPosition) {
// don't reprocess records after the last source event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder detectReprocessingInconsistency(
final boolean detectReprocessingInconsistency) {
this.processingContext.setDetectReprocessingInconsistency(detectReprocessingInconsistency);
return this;
}

public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void shouldDetectNoIssues() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
assertThat(
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldDetectDifferentKey() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down Expand Up @@ -152,7 +152,7 @@ public void shouldDetectDifferentIntent() {
.causedBy(1));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void shouldDetectMissingRecordOnLogStream() {
.causedBy(2));

// when
engine.start();
engine.startWithReprocessingDetection();

// then
final var streamProcessor = engine.getStreamProcessor(1);
Expand Down
Loading

0 comments on commit 7cc99d4

Please sign in to comment.