Skip to content

Commit

Permalink
test(qa): await until no more marked-for-deletion segments are present
Browse files Browse the repository at this point in the history
(cherry picked from commit 6fc433d)
  • Loading branch information
npepinpe authored and github-actions[bot] committed Jul 28, 2022
1 parent 298209f commit d705af0
Showing 1 changed file with 19 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.it.util.GrpcClientRule;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
Expand Down Expand Up @@ -43,7 +42,7 @@ public class ReaderCloseTest {

// Regression test for https://github.com/camunda/zeebe/issues/7767
@Test
public void shouldDeleteCompactedSegmentsFiles() throws IOException {
public void shouldDeleteCompactedSegmentsFiles() {
// given
fillSegments();

Expand All @@ -52,13 +51,13 @@ public void shouldDeleteCompactedSegmentsFiles() throws IOException {

// then
for (final Broker broker : clusteringRule.getBrokers()) {
assertThatFilesOfDeletedSegmentsDoesNotExist(broker);
awaitNoDanglingReaders(broker);
}
}

// Regression test for https://github.com/camunda/zeebe/issues/7767
@Test
public void shouldDeleteCompactedSegmentsFilesAfterLeaderChange() throws IOException {
public void shouldDeleteCompactedSegmentsFilesAfterLeaderChange() {
// given
fillSegments();
final var leaderId = clusteringRule.getLeaderForPartition(1).getNodeId();
Expand All @@ -84,35 +83,28 @@ public void shouldDeleteCompactedSegmentsFilesAfterLeaderChange() throws IOExcep

// then
for (final Broker broker : clusteringRule.getBrokers()) {
assertThatFilesOfDeletedSegmentsDoesNotExist(broker);
awaitNoDanglingReaders(broker);
}

assertThat(leaderId).isNotEqualTo(clusteringRule.getLeaderForPartition(1).getNodeId());
}

private void assertThatFilesOfDeletedSegmentsDoesNotExist(final Broker leader)
throws IOException {
final var segmentDirectory = clusteringRule.getSegmentsDirectory(leader);
try (final var stream =
Files.newDirectoryStream(segmentDirectory, path -> !path.toFile().isDirectory())) {
stream.forEach(
path ->
assertThat(isEitherLogOrRaftMetaFiles(path))
.as(
"The files in the segment directory should be either valid log segments or raft config and metadata. %s",
path)
.isTrue());
}
private void awaitNoDanglingReaders(final Broker broker) {
Awaitility.await("until all readers are closed, observed via segment deletion")
.atMost(Duration.ofSeconds(20))
.untilAsserted(() -> assertThatFilesOfDeletedSegmentsDoesNotExist(broker));
}

private boolean isEitherLogOrRaftMetaFiles(final Path path) {
final var filename = path.getFileName().toString();
return filename.endsWith(".log")
|| filename.endsWith(".conf")
|| filename.endsWith(".meta")
|| filename.endsWith(".lock");
private void assertThatFilesOfDeletedSegmentsDoesNotExist(final Broker leader) {
final var segmentDirectory = clusteringRule.getSegmentsDirectory(leader);
assertThat(segmentDirectory)
.as(
"broker <%s> closed all readers as it doesn't contain any marked-for-deletion segments",
leader.getConfig().getCluster().getNodeId())
.isDirectoryNotContaining("regex:.*-deleted");
}

public void fillSegments() {
private void fillSegments() {
clusteringRule.runUntilSegmentsFilled(
clusteringRule.getBrokers(),
2,
Expand Down

0 comments on commit d705af0

Please sign in to comment.