Skip to content

Commit

Permalink
fix(atomix): do not append invalid entries
Browse files Browse the repository at this point in the history
(cherry picked from commit 34c79d5)
  • Loading branch information
ChrisKujawa authored and github-actions[bot] committed Feb 16, 2021
1 parent 3a2d486 commit e02a798
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 33 deletions.
39 changes: 20 additions & 19 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -700,27 +700,28 @@ private void safeAppendEntry(
if (result.failed()) {
appendListener.onWriteError(new IllegalStateException(result.getErrorMessage()));
raft.transition(Role.FOLLOWER);
}
} else {
append(entry)
.whenComplete(
(indexed, error) -> {
if (error != null) {
appendListener.onWriteError(Throwables.getRootCause(error));
if (!(error instanceof StorageException)) {
// step down. Otherwise the following event can get appended resulting in gaps
log.info(
"Unexpected error occurred while appending to local log, stepping down");
raft.transition(Role.FOLLOWER);
}
} else {
if (indexed.type().equals(ZeebeEntry.class)) {
lastZbEntry = indexed.entry();
}

append(entry)
.whenComplete(
(indexed, error) -> {
if (error != null) {
appendListener.onWriteError(Throwables.getRootCause(error));
if (!(error instanceof StorageException)) {
// step down. Otherwise the following event can get appended resulting in gaps
log.info("Unexpected error occurred while appending to local log, stepping down");
raft.transition(Role.FOLLOWER);
}
} else {
if (indexed.type().equals(ZeebeEntry.class)) {
lastZbEntry = indexed.entry();
appendListener.onWrite(indexed);
replicate(indexed, appendListener);
}

appendListener.onWrite(indexed);
replicate(indexed, appendListener);
}
});
});
}
}

private void replicate(final Indexed<ZeebeEntry> indexed, final AppendListener appendListener) {
Expand Down
21 changes: 7 additions & 14 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.atomix.raft;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import io.atomix.cluster.ClusterMembershipService;
Expand Down Expand Up @@ -70,6 +69,7 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.Description;
Expand Down Expand Up @@ -386,17 +386,10 @@ public Map<String, List<Indexed<?>>> getMemberLogs() {
}

public void awaitSameLogSizeOnAllNodes(final long lastIndex) {
waitUntil(
() -> {
final var lastIndexes =
memberLog.values().stream().distinct().collect(Collectors.toList());
return lastIndexes.size() == 1 && lastIndexes.get(0) == lastIndex;
},
() -> memberLog.toString());
}

private void waitUntil(final BooleanSupplier condition, final Supplier<String> errorMessage) {
waitUntil(condition, 100, errorMessage);
Awaitility.await("awaitSameLogSizeOnAllNodes")
.until(
() -> memberLog.values().stream().distinct().collect(Collectors.toList()),
lastIndexes -> lastIndexes.size() == 1 && lastIndexes.get(0) == lastIndex);
}

private void waitUntil(final BooleanSupplier condition, final int retries) {
Expand Down Expand Up @@ -611,7 +604,7 @@ public void onWrite(final Indexed<ZeebeEntry> indexed) {}

@Override
public void onWriteError(final Throwable error) {
fail("Unexpected write error: " + error.getMessage());
commitFuture.completeExceptionally(error);
}

@Override
Expand All @@ -621,7 +614,7 @@ public void onCommit(final Indexed<ZeebeEntry> indexed) {

@Override
public void onCommitError(final Indexed<ZeebeEntry> indexed, final Throwable error) {
fail("Unexpected write error: " + error.getMessage());
commitFuture.completeExceptionally(error);
}

public long awaitCommit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2020 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.raft;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.atomix.raft.zeebe.EntryValidator;
import io.atomix.raft.zeebe.ValidationResult;
import io.atomix.raft.zeebe.ZeebeEntry;
import java.util.List;
import java.util.function.BiFunction;
import org.junit.Rule;
import org.junit.Test;

public class SingleRaftEntryValidationTest {

private final TestEntryValidator entryValidator = new TestEntryValidator();

@Rule
public RaftRule raftRule = RaftRule.withBootstrappedNodes(1).setEntryValidator(entryValidator);

@Test
public void shouldFailAppendOnInvalidEntry() {
// given
entryValidator.validation = (last, current) -> ValidationResult.failure("invalid");

// when - then expect
assertThatThrownBy(() -> raftRule.appendEntry()).hasMessageContaining("invalid");
}

@Test
public void shouldNotAppendInvalidEntryToLog() throws Exception {
// given
entryValidator.validation = (last, current) -> ValidationResult.failure("invalid");

// when
assertThatThrownBy(() -> raftRule.appendEntry()).hasMessageContaining("invalid");
entryValidator.validation = (last, current) -> ValidationResult.success();
raftRule.awaitNewLeader();
final var commitIndex =
raftRule.appendEntry(); // append another entry to await the commit index

// then
raftRule.awaitCommit(commitIndex);
raftRule.awaitSameLogSizeOnAllNodes(commitIndex);
final var memberLog = raftRule.getMemberLogs();

final var logLength = memberLog.values().stream().map(List::size).findFirst().orElseThrow();
assertThat(logLength).withFailMessage(memberLog.toString()).isEqualTo(3);
}

private static class TestEntryValidator implements EntryValidator {
BiFunction<ZeebeEntry, ZeebeEntry, ValidationResult> validation;

@Override
public ValidationResult validateEntry(final ZeebeEntry lastEntry, final ZeebeEntry entry) {
return validation.apply(lastEntry, entry);
}
}
}

0 comments on commit e02a798

Please sign in to comment.