Skip to content

Commit

Permalink
merge: #8858
Browse files Browse the repository at this point in the history
8858: [Backport release-1.4.0-alpha2] fix: avoid transition to inactive when log storage installation fails r=oleschoenburg a=github-actions[bot]

# Description
Backport of #8767 to `release-1.4.0-alpha2`.

relates to #8717

Co-authored-by: Roman <roman.smirnov@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov authored Mar 1, 2022
2 parents 309a8d6 + 75f2886 commit 6dcf2a4
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.HealthMetrics;
import io.camunda.zeebe.broker.system.partitions.impl.RecoverablePartitionTransitionException;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.util.exception.UnrecoverableException;
Expand Down Expand Up @@ -333,6 +334,11 @@ private void onInstallFailure(final Throwable error) {
context.getCurrentTerm(),
error);
handleUnrecoverableFailure(error);
} else if (error instanceof RecoverablePartitionTransitionException) {
LOG.info(
"Aborted installation of partition {}, cause: {}",
context.getPartitionId(),
error.getMessage());
} else {
handleRecoverableFailure();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.system.partitions.impl;

/**
* This exception should be used to indicate that the transition was aborted intentionally and
* should not be treated as a failure.
*/
public class RecoverablePartitionTransitionException extends RuntimeException {

public RecoverablePartitionTransitionException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.RecoverablePartitionTransitionException;
import io.camunda.zeebe.logstreams.storage.atomix.AtomixLogStorage;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.future.ActorFuture;
Expand Down Expand Up @@ -119,8 +120,7 @@ private Either<Exception, AtomixLogStorage> checkAndCreateAtomixLogStorage(

if (raftTerm != targetTerm) {
return left(
new IllegalStateException(
String.format(WRONG_TERM_ERROR_MSG, targetTerm, raftTerm, context.getPartitionId())));
new LogStorageTermMissmatchException(targetTerm, raftTerm, context.getPartitionId()));
} else {
final var logStorage = AtomixLogStorage.ofPartition(server::openReader, logAppender);
return right(logStorage);
Expand All @@ -140,4 +140,12 @@ public void appendEntry(
lowestPosition, highestPosition));
}
}

public static final class LogStorageTermMissmatchException
extends RecoverablePartitionTransitionException {
private LogStorageTermMissmatchException(
final long expectedTerm, final long actualTerm, final int partitionId) {
super(String.format(WRONG_TERM_ERROR_MSG, expectedTerm, actualTerm, partitionId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -24,6 +25,7 @@
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.impl.RaftPartitionServer;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionTransitionImpl;
import io.camunda.zeebe.broker.system.partitions.impl.RecoverablePartitionTransitionException;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.CriticalComponentsHealthMonitor;
import io.camunda.zeebe.util.health.FailureListener;
Expand Down Expand Up @@ -205,6 +207,35 @@ public void shouldStepDownAfterFailedLeaderTransition() throws InterruptedExcept
order.verify(transition).toFollower(1);
}

@Test
public void shouldNotTriggerTransitionOnPartitionTransitionException()
throws InterruptedException {
// given
when(transition.toLeader(anyLong()))
.thenReturn(
CompletableActorFuture.completedExceptionally(
new RecoverablePartitionTransitionException("something went wrong")));

when(raft.getRole()).thenReturn(Role.LEADER);
when(raft.term()).thenReturn(2L);
when(ctx.getCurrentRole()).thenReturn(Role.FOLLOWER);
when(ctx.getCurrentTerm()).thenReturn(1L);

// when
schedulerRule.submitActor(partition);
partition.onNewRole(Role.LEADER, 2);
schedulerRule.workUntilDone();

// then
final InOrder order = inOrder(transition, raft);
// expected transition supposed to fail
order.verify(transition).toLeader(2);
// after failing leader transition no other
// transitions are triggered
order.verify(raft, times(0)).goInactive();
order.verify(transition, times(0)).toFollower(anyLong());
}

@Test
public void shouldGoInactiveAfterFailedFollowerTransition() throws InterruptedException {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.partitions.impl.steps;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -16,6 +17,7 @@
import io.atomix.raft.partition.impl.RaftPartitionServer;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.camunda.zeebe.broker.system.partitions.TestPartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStoragePartitionTransitionStep.LogStorageTermMissmatchException;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.storage.atomix.AtomixLogStorage;
import io.camunda.zeebe.util.health.HealthMonitor;
Expand Down Expand Up @@ -49,6 +51,23 @@ void setup() {
step = new LogStoragePartitionTransitionStep();
}

@ParameterizedTest
@EnumSource(
value = Role.class,
names = {"INACTIVE", "FOLLOWER", "CANDIDATE"})
void shouldThrowTermMissmatchException(final Role currentRole) {
// given
initializeContext(currentRole);
step.prepareTransition(transitionContext, 1, Role.LEADER);

// simulate term change in Raft
when(raftServer.getTerm()).thenReturn(2L);

// when + then
assertThatThrownBy(() -> step.transitionTo(transitionContext, 1, Role.LEADER).join())
.hasCauseInstanceOf(LogStorageTermMissmatchException.class);
}

@ParameterizedTest
@MethodSource("provideTransitionsThatShouldCloseExistingLogStorage")
void shouldCloseExistingLogStorage(final Role currentRole, final Role targetRole) {
Expand Down

0 comments on commit 6dcf2a4

Please sign in to comment.