Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,10 @@
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.GroupPrincipal;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFileAttributes;
Expand All @@ -106,6 +103,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand Down Expand Up @@ -228,14 +226,10 @@ static void writeJar(Path jar, String... classes) throws IOException {
static Path writeZip(Path structure, String prefix) throws IOException {
Path zip = createTempDir().resolve(structure.getFileName() + ".zip");
try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream(zip))) {
Files.walkFileTree(structure, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
stream.putNextEntry(new ZipEntry(target));
Files.copy(file, stream);
return FileVisitResult.CONTINUE;
}
forEachFileRecursively(structure, (file, attrs) -> {
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
stream.putNextEntry(new ZipEntry(target));
Files.copy(file, stream);
});
}
return zip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testDisruptionAfterShardFinalization() throws Exception {
ActionFuture<CreateSnapshotResponse> future = client(masterNode).admin().cluster()
.prepareCreateSnapshot(repoName, snapshot).setWaitForCompletion(true).execute();

waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));
waitForBlockOnAnyDataNode(repoName);

NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
Expand All @@ -197,7 +197,7 @@ public void testDisruptionAfterShardFinalization() throws Exception {
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture =
client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).execute();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L));
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
assertFutureThrows(snapshotFuture, SnapshotException.class);

Expand Down Expand Up @@ -228,7 +228,7 @@ public void testMasterFailOverDuringShardSnapshots() throws Exception {
final ActionFuture<CreateSnapshotResponse> snapshotResponse = internalCluster().masterClient().admin().cluster()
.prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute();

waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);

final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -92,6 +89,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
Expand Down Expand Up @@ -441,14 +439,9 @@ public void testCancellationCleansTempFiles() throws Exception {
if (Files.exists(shardLoc)) {
assertBusy(() -> {
try {
Files.walkFileTree(shardLoc, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery.")));
return FileVisitResult.CONTINUE;
}
});
forEachFileRecursively(shardLoc,
(file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery."))));
} catch (IOException e) {
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -82,22 +80,21 @@ private String startBlockedCleanup(String repoName) throws Exception {
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setWaitForCompletion(true).get();

final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);

logger.info("--> creating a garbage data blob");
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
garbageFuture.get();

final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);

logger.info("--> starting repository cleanup");
client().admin().cluster().prepareCleanupRepository(repoName).execute();

logger.info("--> waiting for block to kick in on " + masterNode);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName);
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
return masterNode;
Expand All @@ -116,9 +113,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
}

final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);

final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
logger.info("--> write two outdated index-N blobs");
for (int i = 0; i < 2; ++i) {
final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -79,8 +78,7 @@ public void testShardClone() throws Exception {
final String sourceSnapshot = "source-snapshot";
final SnapshotInfo sourceSnapshotInfo = createFullSnapshot(repoName, sourceSnapshot);

final BlobStoreRepository repository =
(BlobStoreRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
final RepositoryData repositoryData = getRepositoryData(repoName);
final IndexId indexId = repositoryData.resolveIndexId(indexName);
final int shardId = 0;
Expand Down Expand Up @@ -167,7 +165,7 @@ public void testClonePreventsSnapshotDelete() throws Exception {
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
assertFalse(cloneFuture.isDone());

ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
Expand Down Expand Up @@ -201,7 +199,7 @@ public void testConcurrentCloneAndSnapshot() throws Exception {
final String targetSnapshot = "target-snapshot";
final ActionFuture<CreateSnapshotResponse> snapshot2Future =
startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repoName, dataNode);
Expand All @@ -224,7 +222,7 @@ public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception {
final String targetSnapshot = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);

final String indexFast = "index-fast";
createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
Expand Down Expand Up @@ -255,7 +253,7 @@ public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin()
.prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);

final String targetSnapshot = "target-snapshot";
assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
Expand All @@ -282,7 +280,7 @@ public void testDeletePreventsClone() throws Exception {
final String targetSnapshot = "target-snapshot";
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
assertFalse(deleteFuture.isDone());

ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
Expand Down Expand Up @@ -310,7 +308,7 @@ public void testBackToBackClonesForIndexNotInCluster() throws Exception {
final String targetSnapshot1 = "target-snapshot";
blockMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
assertThat(cloneFuture1.isDone(), is(false));

final int extraClones = randomIntBetween(1, 5);
Expand Down Expand Up @@ -366,7 +364,7 @@ public void testMasterFailoverDuringCloneStep1() throws Exception {
startCloneFromDataNode(repoName, sourceSnapshot, cloneName, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
internalCluster().restartNode(masterNode);
boolean cloneSucceeded = false;
try {
Expand All @@ -377,7 +375,7 @@ public void testMasterFailoverDuringCloneStep1() throws Exception {
// snapshot on disconnect slowly enough for it to work out
}

awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();

// Check if the clone operation worked out by chance as a result of the clone request being retried because of the master failover
cloneSucceeded = cloneSucceeded ||
Expand Down Expand Up @@ -418,10 +416,10 @@ public void testMasterFailoverDuringCloneStep2() throws Exception {
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
internalCluster().restartNode(masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();

assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
}
Expand All @@ -443,10 +441,10 @@ public void testExceptionDuringShardClone() throws Exception {
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
final String masterNode = internalCluster().getMasterName();
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations(internalCluster().getMasterName());
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}
Expand All @@ -465,7 +463,7 @@ public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception {
final ActionFuture<CreateSnapshotResponse> sourceSnapshotFuture = masterClient.admin().cluster()
.prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(dataNode, repoName);
internalCluster().restartNode(dataNode);
assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));

Expand All @@ -490,7 +488,7 @@ public void testStartSnapshotWithSuccessfulShardClonePendingFinalization() throw
blockMasterOnWriteIndexFile(repoName);
final String cloneName = "clone-blocked";
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
blockNodeOnAnyFiles(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
Expand Down Expand Up @@ -520,7 +518,7 @@ public void testStartCloneWithSuccessfulShardClonePendingFinalization() throws E
blockMasterOnWriteIndexFile(repoName);
final String cloneName = "clone-blocked";
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
final String otherCloneName = "other-clone";
final ActionFuture<AcknowledgedResponse> otherClone = startClone(repoName, sourceSnapshot, otherCloneName, indexName);
Expand Down Expand Up @@ -549,7 +547,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw

blockMasterOnWriteIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(masterName, repoName);
awaitNumberOfSnapshotsInProgress(1);
final String cloneName = "clone";
final ActionFuture<AcknowledgedResponse> clone = startClone(repoName, sourceSnapshot, cloneName, indexName);
Expand Down Expand Up @@ -589,13 +587,11 @@ private static ActionFuture<AcknowledgedResponse> startClone(Client client, Stri
}

private void blockMasterOnReadIndexMeta(String repoName) {
((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnReadIndexMeta();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnReadIndexMeta();
}

private void blockMasterOnShardClone(String repoName) {
((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
.setBlockOnWriteShardLevelMeta();
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
}

/**
Expand Down
Loading