diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index f7ea185dd3bb8..82480acf96169 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -51,6 +51,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.repositories.fs.ReloadableFsRepository.BLOCK_SEGMENT_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -113,6 +114,26 @@ protected void setFailRate(String repoName, int value) throws ExecutionException createRepository(repoName, ReloadableFsRepository.TYPE, settings); } + protected void setBlockOnSegments(String repoName) throws ExecutionException, InterruptedException { + GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName }); + GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get(); + RepositoryMetadata rmd = res.repositories().get(0); + Settings.Builder settings = Settings.builder() + .put("location", rmd.settings().get("location")) + .put(BLOCK_SEGMENT_SETTING.getKey(), true); + createRepository(repoName, ReloadableFsRepository.TYPE, settings); + } + + protected void unsetBlockOnSegments(String repoName) throws ExecutionException, InterruptedException { + GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName }); + GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get(); + RepositoryMetadata rmd = res.repositories().get(0); + Settings.Builder settings = Settings.builder() + .put("location", rmd.settings().get("location")) + .put(BLOCK_SEGMENT_SETTING.getKey(), false); + createRepository(repoName, ReloadableFsRepository.TYPE, settings); + } + public void initDocRepToRemoteMigration() { assertTrue( internalCluster().client() diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 7cacfdc972736..c32d3520e83cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -18,14 +18,17 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -195,4 +198,87 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } + + public void testMixedModeRelocation_FailInFinalize() throws Exception { + String docRepNode = internalCluster().startNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // create shard with 0 replica and 1 shard + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); + ensureGreen("test"); + + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); + + refresh("test"); + + // add remote node in mixed mode cluster + setAddRemote(true); + String remoteNode = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + AtomicBoolean failFinalize = new AtomicBoolean(true); + + MockTransportService remoteNodeTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + remoteNode + ); + + remoteNodeTransportService.addRequestHandlingBehavior( + PeerRecoveryTargetService.Actions.FINALIZE, + (handler, request, channel, task) -> { + if (failFinalize.get()) { + throw new IOException("Failing finalize"); + } else { + handler.messageReceived(request, channel, task); + } + } + ); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "40s")) + .get(); + + // Change direction to remote store + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(5)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + + assertTrue(clusterHealthResponse.getRelocatingShards() == 1); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest() + .waitForNoRelocatingShards(true) + .waitForNoInitializingShards(true); + ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet(); + assertEquals(actionGet.getRelocatingShards(), 0); + assertEquals(docRepNode, primaryNodeName("test")); + + // now unblock it + logger.info("Unblocking the finalize recovery now"); + failFinalize.set(false); + + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); + waitForRelocation(); + + asyncIndexingService.stopIndexing(); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) + .get(); + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java index b6644ffd16bab..931841ae3de6e 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java @@ -225,6 +225,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { + Files.createDirectories(path); try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); org.opensearch.common.util.io.Streams.copy( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f26e53967b873..df841dac4cf8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5056,7 +5056,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException { */ public void deleteRemoteStoreContents() throws IOException { deleteTranslogFilesFromRemoteTranslog(); - getRemoteDirectory().deleteStaleSegments(0); + getRemoteDirectory().delete(); } public void syncTranslogFilesFromRemoteTranslog() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 949113cce87d6..e6a791858ab89 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -141,6 +141,9 @@ public void onFailure(Exception e) { throw new IOException("Exception in listFilesByPrefixInLexicographicOrder with prefix: " + filenamePrefix, e); } if (exception.get() != null) { + if (exception.get() instanceof NoSuchFileException) { + return sortedBlobList; + } throw new IOException(exception.get()); } else { return sortedBlobList; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 941cf047347f7..54c6fe5c73376 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -925,7 +925,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException .filter(metadataFile -> allLockFiles.contains(metadataFile) == false) .collect(Collectors.toList()); - logger.debug( + logger.info( "metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}", metadataFilesEligibleToDelete, metadataFilesToBeDeleted @@ -1061,7 +1061,7 @@ private boolean deleteIfEmpty() throws IOException { return delete(); } - private boolean delete() { + public boolean delete() { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete();