Skip to content

Commit a7a1c24

Browse files
Simplify Snapshot ITs Further (#63655)
* Removing some more duplication and redundant logic. * Aligning all timeouts to 30s (60s or even 10 minute timeouts should be unnecessary, if they aren't we should figure out why) * Remove some usage of `actionGet()` in tests (it's just evil to suppress the stack-trace)
1 parent b44a03d commit a7a1c24

File tree

17 files changed

+201
-301
lines changed

17 files changed

+201
-301
lines changed

distribution/tools/plugin-cli/src/test/java/org/elasticsearch/plugins/InstallPluginCommandTests.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,10 @@
7575
import java.nio.file.DirectoryStream;
7676
import java.nio.file.FileAlreadyExistsException;
7777
import java.nio.file.FileSystem;
78-
import java.nio.file.FileVisitResult;
7978
import java.nio.file.Files;
8079
import java.nio.file.NoSuchFileException;
8180
import java.nio.file.Path;
82-
import java.nio.file.SimpleFileVisitor;
8381
import java.nio.file.StandardCopyOption;
84-
import java.nio.file.attribute.BasicFileAttributes;
8582
import java.nio.file.attribute.GroupPrincipal;
8683
import java.nio.file.attribute.PosixFileAttributeView;
8784
import java.nio.file.attribute.PosixFileAttributes;
@@ -106,6 +103,7 @@
106103
import java.util.zip.ZipEntry;
107104
import java.util.zip.ZipOutputStream;
108105

106+
import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
109107
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
110108
import static org.hamcrest.CoreMatchers.equalTo;
111109
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -228,14 +226,10 @@ static void writeJar(Path jar, String... classes) throws IOException {
228226
static Path writeZip(Path structure, String prefix) throws IOException {
229227
Path zip = createTempDir().resolve(structure.getFileName() + ".zip");
230228
try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream(zip))) {
231-
Files.walkFileTree(structure, new SimpleFileVisitor<Path>() {
232-
@Override
233-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
234-
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
235-
stream.putNextEntry(new ZipEntry(target));
236-
Files.copy(file, stream);
237-
return FileVisitResult.CONTINUE;
238-
}
229+
forEachFileRecursively(structure, (file, attrs) -> {
230+
String target = (prefix == null ? "" : prefix + "/") + structure.relativize(file).toString();
231+
stream.putNextEntry(new ZipEntry(target));
232+
Files.copy(file, stream);
239233
});
240234
}
241235
return zip;

server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void testDisruptionAfterShardFinalization() throws Exception {
171171
ActionFuture<CreateSnapshotResponse> future = client(masterNode).admin().cluster()
172172
.prepareCreateSnapshot(repoName, snapshot).setWaitForCompletion(true).execute();
173173

174-
waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));
174+
waitForBlockOnAnyDataNode(repoName);
175175

176176
NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
177177
internalCluster().setDisruptionScheme(networkDisruption);
@@ -197,7 +197,7 @@ public void testDisruptionAfterShardFinalization() throws Exception {
197197
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
198198
final ActionFuture<CreateSnapshotResponse> snapshotFuture =
199199
client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).execute();
200-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L));
200+
waitForBlock(masterNode, repoName);
201201
unblockNode(repoName, masterNode);
202202
assertFutureThrows(snapshotFuture, SnapshotException.class);
203203

@@ -228,7 +228,7 @@ public void testMasterFailOverDuringShardSnapshots() throws Exception {
228228
final ActionFuture<CreateSnapshotResponse> snapshotResponse = internalCluster().masterClient().admin().cluster()
229229
.prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute();
230230

231-
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
231+
waitForBlock(dataNode, repoName);
232232

233233
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
234234
internalCluster().setDisruptionScheme(networkDisruption);

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,8 @@
7373
import org.elasticsearch.transport.TransportService;
7474

7575
import java.io.IOException;
76-
import java.nio.file.FileVisitResult;
7776
import java.nio.file.Files;
7877
import java.nio.file.Path;
79-
import java.nio.file.SimpleFileVisitor;
80-
import java.nio.file.attribute.BasicFileAttributes;
8178
import java.util.ArrayList;
8279
import java.util.Arrays;
8380
import java.util.Collection;
@@ -92,6 +89,7 @@
9289
import java.util.stream.Stream;
9390

9491
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
92+
import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.forEachFileRecursively;
9593
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
9694
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
9795
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@@ -441,14 +439,9 @@ public void testCancellationCleansTempFiles() throws Exception {
441439
if (Files.exists(shardLoc)) {
442440
assertBusy(() -> {
443441
try {
444-
Files.walkFileTree(shardLoc, new SimpleFileVisitor<Path>() {
445-
@Override
446-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
447-
assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
448-
not(startsWith("recovery.")));
449-
return FileVisitResult.CONTINUE;
450-
}
451-
});
442+
forEachFileRecursively(shardLoc,
443+
(file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
444+
not(startsWith("recovery."))));
452445
} catch (IOException e) {
453446
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
454447
}

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.elasticsearch.action.support.PlainActionFuture;
2424
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.common.unit.TimeValue;
27-
import org.elasticsearch.repositories.RepositoriesService;
2826
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
2927
import org.elasticsearch.snapshots.SnapshotState;
3028
import org.elasticsearch.test.ESIntegTestCase;
@@ -82,22 +80,21 @@ private String startBlockedCleanup(String repoName) throws Exception {
8280
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
8381
.setWaitForCompletion(true).get();
8482

85-
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
86-
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
83+
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
8784

8885
logger.info("--> creating a garbage data blob");
8986
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
9087
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
9188
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
9289
garbageFuture.get();
9390

94-
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
91+
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
9592

9693
logger.info("--> starting repository cleanup");
9794
client().admin().cluster().prepareCleanupRepository(repoName).execute();
9895

99-
logger.info("--> waiting for block to kick in on " + masterNode);
100-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
96+
final String masterNode = internalCluster().getMasterName();
97+
waitForBlock(masterNode, repoName);
10198
awaitClusterState(state ->
10299
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
103100
return masterNode;
@@ -116,9 +113,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
116113
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
117114
}
118115

119-
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
120-
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
121-
116+
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
122117
logger.info("--> write two outdated index-N blobs");
123118
for (int i = 0; i < 2; ++i) {
124119
final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.unit.TimeValue;
2929
import org.elasticsearch.common.util.MockBigArrays;
3030
import org.elasticsearch.index.IndexNotFoundException;
31-
import org.elasticsearch.repositories.RepositoriesService;
3231
import org.elasticsearch.repositories.RepositoryData;
3332
import org.elasticsearch.snapshots.mockstore.MockRepository;
3433
import org.elasticsearch.test.ESIntegTestCase;
@@ -79,8 +78,7 @@ public void testShardClone() throws Exception {
7978
final String sourceSnapshot = "source-snapshot";
8079
final SnapshotInfo sourceSnapshotInfo = createFullSnapshot(repoName, sourceSnapshot);
8180

82-
final BlobStoreRepository repository =
83-
(BlobStoreRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
81+
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
8482
final RepositoryData repositoryData = getRepositoryData(repoName);
8583
final IndexId indexId = repositoryData.resolveIndexId(indexName);
8684
final int shardId = 0;
@@ -167,7 +165,7 @@ public void testClonePreventsSnapshotDelete() throws Exception {
167165
final String targetSnapshot = "target-snapshot";
168166
blockNodeOnAnyFiles(repoName, masterName);
169167
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
170-
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
168+
waitForBlock(masterName, repoName);
171169
assertFalse(cloneFuture.isDone());
172170

173171
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
@@ -201,7 +199,7 @@ public void testConcurrentCloneAndSnapshot() throws Exception {
201199
final String targetSnapshot = "target-snapshot";
202200
final ActionFuture<CreateSnapshotResponse> snapshot2Future =
203201
startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
204-
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
202+
waitForBlock(dataNode, repoName);
205203
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
206204
awaitNumberOfSnapshotsInProgress(2);
207205
unblockNode(repoName, dataNode);
@@ -224,7 +222,7 @@ public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception {
224222
final String targetSnapshot = "target-snapshot";
225223
blockMasterOnShardClone(repoName);
226224
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
227-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
225+
waitForBlock(masterNode, repoName);
228226

229227
final String indexFast = "index-fast";
230228
createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
@@ -255,7 +253,7 @@ public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception {
255253
blockDataNode(repoName, dataNode);
256254
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin()
257255
.prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
258-
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
256+
waitForBlock(dataNode, repoName);
259257

260258
final String targetSnapshot = "target-snapshot";
261259
assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
@@ -282,7 +280,7 @@ public void testDeletePreventsClone() throws Exception {
282280
final String targetSnapshot = "target-snapshot";
283281
blockNodeOnAnyFiles(repoName, masterName);
284282
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
285-
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
283+
waitForBlock(masterName, repoName);
286284
assertFalse(deleteFuture.isDone());
287285

288286
ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
@@ -310,7 +308,7 @@ public void testBackToBackClonesForIndexNotInCluster() throws Exception {
310308
final String targetSnapshot1 = "target-snapshot";
311309
blockMasterOnShardClone(repoName);
312310
final ActionFuture<AcknowledgedResponse> cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
313-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
311+
waitForBlock(masterNode, repoName);
314312
assertThat(cloneFuture1.isDone(), is(false));
315313

316314
final int extraClones = randomIntBetween(1, 5);
@@ -366,7 +364,7 @@ public void testMasterFailoverDuringCloneStep1() throws Exception {
366364
startCloneFromDataNode(repoName, sourceSnapshot, cloneName, testIndex);
367365
awaitNumberOfSnapshotsInProgress(1);
368366
final String masterNode = internalCluster().getMasterName();
369-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
367+
waitForBlock(masterNode, repoName);
370368
internalCluster().restartNode(masterNode);
371369
boolean cloneSucceeded = false;
372370
try {
@@ -377,7 +375,7 @@ public void testMasterFailoverDuringCloneStep1() throws Exception {
377375
// snapshot on disconnect slowly enough for it to work out
378376
}
379377

380-
awaitNoMoreRunningOperations(internalCluster().getMasterName());
378+
awaitNoMoreRunningOperations();
381379

382380
// Check if the clone operation worked out by chance as a result of the clone request being retried because of the master failover
383381
cloneSucceeded = cloneSucceeded ||
@@ -418,10 +416,10 @@ public void testMasterFailoverDuringCloneStep2() throws Exception {
418416
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
419417
awaitNumberOfSnapshotsInProgress(1);
420418
final String masterNode = internalCluster().getMasterName();
421-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
419+
waitForBlock(masterNode, repoName);
422420
internalCluster().restartNode(masterNode);
423421
expectThrows(SnapshotException.class, cloneFuture::actionGet);
424-
awaitNoMoreRunningOperations(internalCluster().getMasterName());
422+
awaitNoMoreRunningOperations();
425423

426424
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
427425
}
@@ -443,10 +441,10 @@ public void testExceptionDuringShardClone() throws Exception {
443441
final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
444442
awaitNumberOfSnapshotsInProgress(1);
445443
final String masterNode = internalCluster().getMasterName();
446-
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
444+
waitForBlock(masterNode, repoName);
447445
unblockNode(repoName, masterNode);
448446
expectThrows(SnapshotException.class, cloneFuture::actionGet);
449-
awaitNoMoreRunningOperations(internalCluster().getMasterName());
447+
awaitNoMoreRunningOperations();
450448
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
451449
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
452450
}
@@ -465,7 +463,7 @@ public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception {
465463
final ActionFuture<CreateSnapshotResponse> sourceSnapshotFuture = masterClient.admin().cluster()
466464
.prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
467465
awaitNumberOfSnapshotsInProgress(1);
468-
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
466+
waitForBlock(dataNode, repoName);
469467
internalCluster().restartNode(dataNode);
470468
assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
471469

@@ -490,7 +488,7 @@ public void testStartSnapshotWithSuccessfulShardClonePendingFinalization() throw
490488
blockMasterOnWriteIndexFile(repoName);
491489
final String cloneName = "clone-blocked";
492490
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
493-
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
491+
waitForBlock(masterName, repoName);
494492
awaitNumberOfSnapshotsInProgress(1);
495493
blockNodeOnAnyFiles(repoName, dataNode);
496494
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
@@ -520,7 +518,7 @@ public void testStartCloneWithSuccessfulShardClonePendingFinalization() throws E
520518
blockMasterOnWriteIndexFile(repoName);
521519
final String cloneName = "clone-blocked";
522520
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
523-
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
521+
waitForBlock(masterName, repoName);
524522
awaitNumberOfSnapshotsInProgress(1);
525523
final String otherCloneName = "other-clone";
526524
final ActionFuture<AcknowledgedResponse> otherClone = startClone(repoName, sourceSnapshot, otherCloneName, indexName);
@@ -549,7 +547,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
549547

550548
blockMasterOnWriteIndexFile(repoName);
551549
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
552-
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
550+
waitForBlock(masterName, repoName);
553551
awaitNumberOfSnapshotsInProgress(1);
554552
final String cloneName = "clone";
555553
final ActionFuture<AcknowledgedResponse> clone = startClone(repoName, sourceSnapshot, cloneName, indexName);
@@ -589,13 +587,11 @@ private static ActionFuture<AcknowledgedResponse> startClone(Client client, Stri
589587
}
590588

591589
private void blockMasterOnReadIndexMeta(String repoName) {
592-
((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
593-
.setBlockOnReadIndexMeta();
590+
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnReadIndexMeta();
594591
}
595592

596593
private void blockMasterOnShardClone(String repoName) {
597-
((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
598-
.setBlockOnWriteShardLevelMeta();
594+
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
599595
}
600596

601597
/**

0 commit comments

Comments
 (0)