From 595913892f50ee16134f2816dbb2de895962ce58 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Mon, 9 Oct 2023 15:50:11 -0700 Subject: [PATCH] Replace multipart download with parallel file download There are a few open issues with the multi-stream download approach: - Recovery stats are not being reported correctly - It is incompatible (short of reopening and re-reading the entire file) with the existing Lucene checksum validation logic - There are some issues with integrating it with the pending client side encryption work Given this, I attempted an experiment where I replaced with multi-stream-within-a-single-file approach with simply parallelizing downloads across files (this is how snapshot restore works). I actually got better results with this approach: recovering a ~52GiB shard took about 4.7 minutes with the multi-stream code versus 3.9 minutes with the parallel file approach (r7g.4xlarge EC2 instance, 500MiB/s EBS volume, S3 as remote repository). I think this is the right approach as it leverages the more battle-tested code path and addresses the three issues listed above. The multi-stream approach still has promise as it will allow us to download very large files faster (whereas this approach they can be the long poll on the transfer operation). However, given that 5GB segments (made up of multiple files in practice) are the norm, we generally aren't dealing with huge files. Signed-off-by: Andrew Ross --- .../repositories/s3/S3BlobContainer.java | 48 ---- .../s3/S3BlobStoreContainerTests.java | 205 ---------------- .../mocks/MockFsAsyncBlobContainer.java | 25 -- .../AsyncMultiStreamBlobContainer.java | 10 - ...syncMultiStreamEncryptedBlobContainer.java | 76 ------ .../blobstore/stream/read/ReadContext.java | 70 ------ .../stream/read/listener/FilePartWriter.java | 44 ---- .../read/listener/ReadContextListener.java | 199 --------------- .../stream/read/listener/package-info.java | 14 -- .../blobstore/stream/read/package-info.java | 13 - .../org/opensearch/index/IndexModule.java | 7 +- .../org/opensearch/index/IndexService.java | 9 +- .../opensearch/index/shard/IndexShard.java | 47 +--- .../opensearch/index/shard/StoreRecovery.java | 11 +- .../index/store/RemoteDirectory.java | 4 - .../store/RemoteSegmentStoreDirectory.java | 74 +----- .../RemoteSegmentStoreDirectoryFactory.java | 11 +- .../store/RemoteStoreFileDownloader.java | 146 +++++++++++ .../opensearch/indices/IndicesService.java | 9 +- .../RemoteStoreReplicationSource.java | 32 +-- .../main/java/org/opensearch/node/Node.java | 6 +- .../blobstore/BlobStoreRepository.java | 6 +- ...ultiStreamEncryptedBlobContainerTests.java | 125 ---------- .../read/listener/FilePartWriterTests.java | 67 ----- .../read/listener/ListenerTestUtils.java | 56 ----- .../listener/ReadContextListenerTests.java | 229 ------------------ .../opensearch/index/IndexModuleTests.java | 5 +- .../RemoteStoreRefreshListenerTests.java | 4 +- ...moteSegmentStoreDirectoryFactoryTests.java | 7 +- .../RemoteSegmentStoreDirectoryTests.java | 127 +--------- .../store/RemoteStoreFileDownloaderTests.java | 119 +++++++++ .../snapshots/BlobStoreFormatTests.java | 6 - .../snapshots/SnapshotResiliencyTests.java | 5 +- .../index/shard/IndexShardTestCase.java | 11 +- 34 files changed, 330 insertions(+), 1497 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java delete mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java delete mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java delete mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java delete mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java delete mode 100644 server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java delete mode 100644 server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java delete mode 100644 server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java delete mode 100644 server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 9ffdba5eaae3a..c62d358d640c0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -70,14 +70,12 @@ import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; -import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.AbstractBlobContainer; @@ -222,52 +220,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } - @ExperimentalApi - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { - final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client(); - final String bucketName = blobStore.bucket(); - final String blobKey = buildKey(blobName); - - final CompletableFuture blobMetadataFuture = getBlobMetadata(s3AsyncClient, bucketName, blobKey); - - blobMetadataFuture.whenComplete((blobMetadata, throwable) -> { - if (throwable != null) { - Exception ex = throwable.getCause() instanceof Exception - ? (Exception) throwable.getCause() - : new Exception(throwable.getCause()); - listener.onFailure(ex); - return; - } - - try { - final List blobPartInputStreamFutures = new ArrayList<>(); - final long blobSize = blobMetadata.objectSize(); - final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount(); - final String blobChecksum = blobMetadata.checksum() == null ? null : blobMetadata.checksum().checksumCRC32(); - - if (numberOfParts == null) { - blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null)); - } else { - // S3 multipart files use 1 to n indexing - for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { - final int innerPartNumber = partNumber; - blobPartInputStreamFutures.add( - () -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber) - ); - } - } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); - } catch (Exception ex) { - listener.onFailure(ex); - } - }); - } catch (Exception ex) { - listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex)); - } - } - public boolean remoteIntegrityCheckSupported() { return true; } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index e266bba372d80..1b16ea9e2b025 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -70,13 +70,11 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; -import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; @@ -98,7 +96,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -917,208 +914,6 @@ public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberO testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } - public void testReadBlobAsyncMultiPart() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - final int partSize = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize(objectSize) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - mockObjectPartResponse(s3AsyncClient, bucketName, blobName, objectPartCount, partSize, objectSize); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(1, readContextActionListener.getResponseCount()); - assertEquals(0, readContextActionListener.getFailureCount()); - ReadContext readContext = readContextActionListener.getResponse(); - assertEquals(objectPartCount, readContext.getNumberOfParts()); - assertEquals(checksum, readContext.getBlobChecksum()); - assertEquals(objectSize, readContext.getBlobSize()); - - for (int partNumber = 1; partNumber < objectPartCount; partNumber++) { - InputStreamContainer inputStreamContainer = readContext.getPartStreams().get(partNumber).get().join(); - final int offset = partNumber * partSize; - assertEquals(partSize, inputStreamContainer.getContentLength()); - assertEquals(offset, inputStreamContainer.getOffset()); - assertEquals(partSize, inputStreamContainer.getInputStream().readAllBytes().length); - } - } - - public void testReadBlobAsyncSinglePart() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final int objectSize = 100; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize((long) objectSize) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - mockObjectResponse(s3AsyncClient, bucketName, blobName, objectSize); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(1, readContextActionListener.getResponseCount()); - assertEquals(0, readContextActionListener.getFailureCount()); - ReadContext readContext = readContextActionListener.getResponse(); - assertEquals(1, readContext.getNumberOfParts()); - assertEquals(checksum, readContext.getBlobChecksum()); - assertEquals(objectSize, readContext.getBlobSize()); - - InputStreamContainer inputStreamContainer = readContext.getPartStreams().stream().findFirst().get().get().join(); - assertEquals(objectSize, inputStreamContainer.getContentLength()); - assertEquals(0, inputStreamContainer.getOffset()); - assertEquals(objectSize, inputStreamContainer.getInputStream().readAllBytes().length); - - } - - public void testReadBlobAsyncFailure() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize(objectSize) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenThrow(new RuntimeException()); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(0, readContextActionListener.getResponseCount()); - assertEquals(1, readContextActionListener.getFailureCount()); - } - - public void testReadBlobAsyncOnCompleteFailureMissingData() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().build()) - .objectSize(null) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(0, readContextActionListener.getResponseCount()); - assertEquals(1, readContextActionListener.getFailureCount()); - } - public void testGetBlobMetadata() throws Exception { final String checksum = randomAlphaOfLengthBetween(1, 10); final long objectSize = 100L; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..d30d43d6fa604 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -14,7 +14,6 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; @@ -25,9 +24,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -118,27 +114,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - new Thread(() -> { - try { - long contentLength = listBlobs().get(blobName).length(); - long partSize = contentLength / 10; - int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); - List blobPartStreams = new ArrayList<>(); - for (int partNumber = 0; partNumber < numberOfParts; partNumber++) { - long offset = partNumber * partSize; - InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); - blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); - } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); - listener.onResponse(blobReadContext); - } catch (Exception e) { - listener.onFailure(e); - } - }).start(); - } - public boolean remoteIntegrityCheckSupported() { return true; } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index 97f304d776f5c..345769f2770aa 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -8,8 +8,6 @@ package org.opensearch.common.blobstore; -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.core.action.ActionListener; @@ -34,14 +32,6 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; - /** - * Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container. - * @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched. - * @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob - */ - @ExperimentalApi - void readBlobAsync(String blobName, ActionListener listener); - /* * Wether underlying blobContainer can verify integrity of data after transfer. If true and if expected * checksum is provided in WriteContext, then the checksum of transferred data is compared with expected checksum diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..4157bf5178360 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -9,17 +9,12 @@ package org.opensearch.common.blobstore; import org.opensearch.common.StreamContext; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.crypto.CryptoHandler; -import org.opensearch.common.crypto.DecryptedRangedStreamProvider; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.stream.Collectors; /** * EncryptedBlobContainer is an encrypted BlobContainer that is backed by a @@ -44,21 +39,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobContainer.asyncBlobUpload(encryptedWriteContext, completionListener); } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - try { - final U cryptoContext = cryptoHandler.loadEncryptionMetadata(getEncryptedHeaderContentSupplier(blobName)); - ActionListener decryptingCompletionListener = ActionListener.map( - listener, - readContext -> new DecryptedReadContext<>(readContext, cryptoHandler, cryptoContext) - ); - - blobContainer.readBlobAsync(blobName, decryptingCompletionListener); - } catch (Exception e) { - listener.onFailure(e); - } - } - @Override public boolean remoteIntegrityCheckSupported() { return false; @@ -115,60 +95,4 @@ public InputStreamContainer provideStream(int partNumber) throws IOException { } } - - /** - * DecryptedReadContext decrypts the encrypted {@link ReadContext} by acting as a transformation wrapper around - * the encrypted object - * @param Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance - * @param Parsed Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance - */ - static class DecryptedReadContext extends ReadContext { - - private final CryptoHandler cryptoHandler; - private final U cryptoContext; - private Long blobSize; - - public DecryptedReadContext(ReadContext readContext, CryptoHandler cryptoHandler, U cryptoContext) { - super(readContext); - this.cryptoHandler = cryptoHandler; - this.cryptoContext = cryptoContext; - } - - @Override - public long getBlobSize() { - // initializes the value lazily - if (blobSize == null) { - this.blobSize = this.cryptoHandler.estimateDecryptedLength(cryptoContext, super.getBlobSize()); - } - return this.blobSize; - } - - @Override - public List getPartStreams() { - return super.getPartStreams().stream() - .map(supplier -> (StreamPartCreator) () -> supplier.get().thenApply(this::decryptInputStreamContainer)) - .collect(Collectors.toUnmodifiableList()); - } - - /** - * Transforms an encrypted {@link InputStreamContainer} to a decrypted instance - * @param inputStreamContainer encrypted input stream container instance - * @return decrypted input stream container instance - */ - private InputStreamContainer decryptInputStreamContainer(InputStreamContainer inputStreamContainer) { - long startOfStream = inputStreamContainer.getOffset(); - long endOfStream = startOfStream + inputStreamContainer.getContentLength() - 1; - DecryptedRangedStreamProvider decryptedStreamProvider = cryptoHandler.createDecryptingStreamOfRange( - cryptoContext, - startOfStream, - endOfStream - ); - - long adjustedPos = decryptedStreamProvider.getAdjustedRange()[0]; - long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1; - final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider() - .apply(inputStreamContainer.getInputStream()); - return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos); - } - } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java deleted file mode 100644 index 4bdce11ff4f9a..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read; - -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.io.InputStreamContainer; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -/** - * ReadContext is used to encapsulate all data needed by BlobContainer#readBlobAsync - */ -@ExperimentalApi -public class ReadContext { - private final long blobSize; - private final List asyncPartStreams; - private final String blobChecksum; - - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { - this.blobSize = blobSize; - this.asyncPartStreams = asyncPartStreams; - this.blobChecksum = blobChecksum; - } - - public ReadContext(ReadContext readContext) { - this.blobSize = readContext.blobSize; - this.asyncPartStreams = readContext.asyncPartStreams; - this.blobChecksum = readContext.blobChecksum; - } - - public String getBlobChecksum() { - return blobChecksum; - } - - public int getNumberOfParts() { - return asyncPartStreams.size(); - } - - public long getBlobSize() { - return blobSize; - } - - public List getPartStreams() { - return asyncPartStreams; - } - - /** - * Functional interface defining an instance that can create an async action - * to create a part of an object represented as an InputStreamContainer. - */ - @FunctionalInterface - public interface StreamPartCreator extends Supplier> { - /** - * Kicks off a async process to start streaming. - * - * @return When the returned future is completed, streaming has - * just begun. Clients must fully consume the resulting stream. - */ - @Override - CompletableFuture get(); - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java deleted file mode 100644 index 1a403200249cd..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.common.annotation.InternalApi; -import org.opensearch.common.io.Channels; -import org.opensearch.common.io.InputStreamContainer; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.function.UnaryOperator; - -/** - * FilePartWriter transfers the provided stream into the specified file path using a {@link FileChannel} - * instance. - */ -@InternalApi -class FilePartWriter { - // 8 MB buffer for transfer - private static final int BUFFER_SIZE = 8 * 1024 * 2024; - - public static void write(Path fileLocation, InputStreamContainer stream, UnaryOperator rateLimiter) throws IOException { - try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - try (InputStream inputStream = rateLimiter.apply(stream.getInputStream())) { - long streamOffset = stream.getOffset(); - final byte[] buffer = new byte[BUFFER_SIZE]; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - Channels.writeToChannel(buffer, 0, bytesRead, outputFileChannel, streamOffset); - streamOffset += bytesRead; - } - } - } - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java deleted file mode 100644 index c77f2384ace0d..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.IOUtils; -import org.opensearch.action.support.GroupedActionListener; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.UUIDs; -import org.opensearch.common.annotation.InternalApi; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.core.action.ActionListener; -import org.opensearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.Collection; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.UnaryOperator; - -/** - * ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer} - * using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams. - */ -@InternalApi -public class ReadContextListener implements ActionListener { - private static final Logger logger = LogManager.getLogger(ReadContextListener.class); - private static final String DOWNLOAD_PREFIX = "download."; - private final String blobName; - private final Path fileLocation; - private final String tmpFileName; - private final Path tmpFileLocation; - private final ActionListener completionListener; - private final ThreadPool threadPool; - private final UnaryOperator rateLimiter; - private final int maxConcurrentStreams; - - public ReadContextListener( - String blobName, - Path fileLocation, - ActionListener completionListener, - ThreadPool threadPool, - UnaryOperator rateLimiter, - int maxConcurrentStreams - ) { - this.blobName = blobName; - this.fileLocation = fileLocation; - this.completionListener = completionListener; - this.threadPool = threadPool; - this.rateLimiter = rateLimiter; - this.maxConcurrentStreams = maxConcurrentStreams; - this.tmpFileName = DOWNLOAD_PREFIX + UUIDs.randomBase64UUID() + "." + blobName; - this.tmpFileLocation = fileLocation.getParent().resolve(tmpFileName); - } - - @Override - public void onResponse(ReadContext readContext) { - logger.debug("Received {} parts for blob {}", readContext.getNumberOfParts(), blobName); - final int numParts = readContext.getNumberOfParts(); - final AtomicBoolean anyPartStreamFailed = new AtomicBoolean(false); - final GroupedActionListener groupedListener = new GroupedActionListener<>(getFileCompletionListener(), numParts); - final Queue queue = new ConcurrentLinkedQueue<>(readContext.getPartStreams()); - final StreamPartProcessor processor = new StreamPartProcessor( - queue, - anyPartStreamFailed, - tmpFileLocation, - groupedListener, - threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY), - rateLimiter - ); - for (int i = 0; i < Math.min(maxConcurrentStreams, queue.size()); i++) { - processor.process(queue.poll()); - } - } - - @SuppressForbidden(reason = "need to fsync once all parts received") - private ActionListener> getFileCompletionListener() { - return ActionListener.wrap(response -> { - logger.trace("renaming temp file [{}] to [{}]", tmpFileLocation, fileLocation); - try { - IOUtils.fsync(tmpFileLocation, false); - Files.move(tmpFileLocation, fileLocation, StandardCopyOption.ATOMIC_MOVE); - // sync parent dir metadata - IOUtils.fsync(fileLocation.getParent(), true); - completionListener.onResponse(blobName); - } catch (IOException e) { - logger.error("Unable to rename temp file + " + tmpFileLocation, e); - completionListener.onFailure(e); - } - }, e -> { - try { - Files.deleteIfExists(tmpFileLocation); - } catch (IOException ex) { - logger.warn("Unable to clean temp file {}", tmpFileLocation); - } - completionListener.onFailure(e); - }); - } - - /* - * For Tests - */ - Path getTmpFileLocation() { - return tmpFileLocation; - } - - @Override - public void onFailure(Exception e) { - completionListener.onFailure(e); - } - - private static class StreamPartProcessor { - private static final RuntimeException CANCELED_PART_EXCEPTION = new RuntimeException( - "Canceled part download due to previous failure" - ); - private final Queue queue; - private final AtomicBoolean anyPartStreamFailed; - private final Path fileLocation; - private final GroupedActionListener completionListener; - private final Executor executor; - private final UnaryOperator rateLimiter; - - private StreamPartProcessor( - Queue queue, - AtomicBoolean anyPartStreamFailed, - Path fileLocation, - GroupedActionListener completionListener, - Executor executor, - UnaryOperator rateLimiter - ) { - this.queue = queue; - this.anyPartStreamFailed = anyPartStreamFailed; - this.fileLocation = fileLocation; - this.completionListener = completionListener; - this.executor = executor; - this.rateLimiter = rateLimiter; - } - - private void process(ReadContext.StreamPartCreator supplier) { - if (supplier == null) { - return; - } - supplier.get().whenCompleteAsync((blobPartStreamContainer, throwable) -> { - if (throwable != null) { - processFailure(throwable instanceof Exception ? (Exception) throwable : new RuntimeException(throwable)); - } else if (anyPartStreamFailed.get()) { - processFailure(CANCELED_PART_EXCEPTION); - } else { - try { - FilePartWriter.write(fileLocation, blobPartStreamContainer, rateLimiter); - completionListener.onResponse(fileLocation.toString()); - - // Upon successfully completing a file part, pull another - // file part off the queue to trigger asynchronous processing - process(queue.poll()); - } catch (Exception e) { - processFailure(e); - } - } - }, executor); - } - - private void processFailure(Exception e) { - if (anyPartStreamFailed.getAndSet(true) == false) { - completionListener.onFailure(e); - - // Drain the queue of pending part downloads. These can be discarded - // since they haven't started any work yet, but the listener must be - // notified for each part. - Object item = queue.poll(); - while (item != null) { - completionListener.onFailure(CANCELED_PART_EXCEPTION); - item = queue.poll(); - } - } else { - completionListener.onFailure(e); - } - try { - Files.deleteIfExists(fileLocation); - } catch (IOException ex) { - // Die silently - logger.info("Failed to delete file {} on stream failure: {}", fileLocation, ex); - } - } - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java deleted file mode 100644 index fe670fe3eb25c..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Abstractions for stream based file reads from the blob store. - * Provides listeners for performing the necessary async read operations to perform - * multi stream reads for blobs from the container. - * */ -package org.opensearch.common.blobstore.stream.read.listener; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java deleted file mode 100644 index a9e2ca35c1fa6..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Abstractions for stream based file reads from the blob store. - * Provides support for async reads from the blob container. - * */ -package org.opensearch.common.blobstore.stream.read; diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 8692876412ea9..e29283724ebf8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -81,6 +81,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; @@ -602,7 +603,8 @@ public IndexService newIndexService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -660,7 +662,8 @@ public IndexService newIndexService( recoveryStateFactory, translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, - clusterRemoteTranslogBufferIntervalSupplier + clusterRemoteTranslogBufferIntervalSupplier, + recoverySettings ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index af23145be9f89..a53dbe246fa44 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,13 +89,13 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; @@ -179,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BiFunction translogFactorySupplier; private final Supplier clusterDefaultRefreshIntervalSupplier; private final Supplier clusterRemoteTranslogBufferIntervalSupplier; + private final RecoverySettings recoverySettings; public IndexService( IndexSettings indexSettings, @@ -213,7 +214,8 @@ public IndexService( IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -290,6 +292,7 @@ public IndexService( this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); this.translogFactorySupplier = translogFactorySupplier; this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; + this.recoverySettings = recoverySettings; updateFsyncTaskIfNecessary(); } @@ -522,7 +525,7 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - (RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory + recoverySettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 251f9a5ae01c0..2643af501bbc9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,7 +62,6 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; @@ -160,9 +159,8 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.RemoteStoreFileDownloader; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -184,6 +182,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -201,7 +200,6 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -342,7 +340,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); - private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; + private final RemoteStoreFileDownloader fileDownloader; public IndexShard( final ShardRouting shardRouting, @@ -371,10 +369,7 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - // Wiring a directory factory here breaks some intended abstractions, but this remote directory - // factory is used not as a Lucene directory but instead to copy files from a remote store when - // restoring a shallow snapshot. - @Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + final RecoverySettings recoverySettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -470,7 +465,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; - this.remoteSegmentStoreDirectoryFactory = remoteSegmentStoreDirectoryFactory; + this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); } public ThreadPool getThreadPool() { @@ -568,6 +563,10 @@ public String getNodeId() { return translogConfig.getNodeId(); } + public RemoteStoreFileDownloader getFileDownloader() { + return fileDownloader; + } + @Override public void updateShardState( final ShardRouting newRouting, @@ -2706,7 +2705,7 @@ public void restoreFromRemoteStore(ActionListener listener) { public void restoreFromSnapshotAndRemoteStore( Repository repository, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + RepositoriesService repositoriesService, ActionListener listener ) { try { @@ -2714,7 +2713,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, remoteSegmentStoreDirectoryFactory, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } @@ -3554,7 +3553,7 @@ public void startRecovery( "from snapshot and remote store", recoveryState, recoveryListener, - l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), remoteSegmentStoreDirectoryFactory, l) + l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), repositoriesService, l) ); // indicesService.indexService(shardRouting.shardId().getIndex()).addMetadataListener(); } else { @@ -4912,7 +4911,7 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); + fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } @@ -4925,26 +4924,6 @@ private String copySegmentFiles( return segmentNFile; } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory sourceRemoteDirectory, - RemoteSegmentStoreDirectory targetRemoteDirectory, - Set toDownloadSegments, - final Runnable onFileSync - ) throws IOException { - final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); - final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker(); - for (String segment : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - onFileSync.run(); - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, segment, segment, IOContext.DEFAULT); - } - } - } - private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 762aab51469d0..c0211e1257c8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -70,7 +70,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -360,8 +362,9 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, - RemoteSegmentStoreDirectoryFactory directoryFactory, - ActionListener listener + RepositoriesService repositoriesService, + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -389,6 +392,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index eb75c39532d71..36d7522564e4f 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -333,10 +333,6 @@ public boolean copyFrom( return false; } - protected UnaryOperator getDownloadRateLimiter() { - return downloadRateLimiter; - } - private void uploadBlob( Directory from, String src, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index dc9706306b408..7428d0a19a8b0 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,8 +24,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -38,7 +36,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -46,7 +43,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -93,8 +89,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -127,15 +121,13 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - ShardId shardId, - RecoverySettings recoverySettings + ShardId shardId ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; this.logger = Loggers.getLogger(getClass(), shardId); init(); } @@ -472,70 +464,6 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } - /** - * Copies an existing {@code source} file from this directory to a non-existent file (also - * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. - * If the blob container backing this directory supports multipart downloads, the {@code source} - * file will be downloaded (potentially in multiple concurrent parts) directly to - * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} - * will be notified upon completion. - *

- * If multipart downloads are not supported, then {@code source} file will be copied to a file named - * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the - * calling thread and {@code fileCompletionListener} will be notified synchronously before this - * method returns. - * - * @param source The source file name - * @param destinationDirectory The destination directory (if multipart is not supported) - * @param destinationPath The destination path (if multipart is supported) - * @param fileTransferTracker Tracker used for file transfer stats - * @param fileCompletionListener The listener to notify of completion - */ - public void copyTo( - String source, - Directory destinationDirectory, - Path destinationPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener fileCompletionListener - ) { - final String blobName = getExistingRemoteFilename(source); - if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - long length = 0L; - try { - length = fileLength(source); - } catch (IOException ex) { - logger.error("Unable to fetch segment length for stats tracking", ex); - } - final long fileLength = length; - final long startTime = System.currentTimeMillis(); - fileTransferTracker.addTransferredBytesStarted(fileLength); - final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); - final Path destinationFilePath = destinationPath.resolve(source); - final ActionListener completionListener = ActionListener.wrap(response -> { - fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime); - fileCompletionListener.onResponse(response); - }, e -> { - fileTransferTracker.addTransferredBytesFailed(fileLength, startTime); - fileCompletionListener.onFailure(e); - }); - final ReadContextListener readContextListener = new ReadContextListener( - blobName, - destinationFilePath, - completionListener, - threadPool, - remoteDataDirectory.getDownloadRateLimiter(), - recoverySettings.getMaxConcurrentRemoteStoreStreams() - ); - blobContainer.readBlobAsync(blobName, readContextListener); - } else { - // Fallback to older mechanism of downloading the file - ActionListener.completeWith(fileCompletionListener, () -> { - destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); - return source; - }); - } - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cc55380894ecd..a5e89ec6a8327 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -15,7 +15,6 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -35,18 +34,12 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private static final String SEGMENTS = "segments"; private final Supplier repositoriesService; - private final RecoverySettings recoverySettings; private final ThreadPool threadPool; - public RemoteSegmentStoreDirectoryFactory( - Supplier repositoriesService, - ThreadPool threadPool, - RecoverySettings recoverySettings - ) { + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; } @Override @@ -78,7 +71,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, recoverySettings); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java new file mode 100644 index 0000000000000..36e89707b9716 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; + +/** + * Helper class to downloads files from a {@link RemoteSegmentStoreDirectory} + * instance to a local {@link Directory} instance in parallel depending on thread + * pool size and recovery settings. + */ +@InternalApi +public final class RemoteStoreFileDownloader { + private final Logger logger; + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) { + this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId); + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param toDownloadSegments The list of segment files to download + */ + public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { + downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory, while also copying the segments _to_ another remote directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param secondDestination The second remote directory that segment files are + * copied to after being copied to the local directory + * @param toDownloadSegments The list of segment files to download + * @param onFileCompletion A generic runnable that is invoked after each file download. + * Must be thread safe as this may be invoked concurrently from + * different threads. + */ + public void download( + Directory source, + Directory destination, + Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + } + + private void downloadInternal( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); + // Choose the minimum of: + // - number of files to download + // - max thread pool size + // - "indices.recovery.max_concurrent_remote_store_streams" setting + final int threads = Math.min( + toDownloadSegments.size(), + Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) + ); + logger.trace("Starting download of {} files with {} threads", queue.size(), threads); + final PlainActionFuture> listener = PlainActionFuture.newFuture(); + final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + for (int i = 0; i < threads; i++) { + threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY) + .submit(() -> copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener)); + } + try { + listener.actionGet(); + } catch (UncategorizedExecutionException e) { + // Any IOException will be double-wrapped so dig it out and throw it + if (e.getCause() instanceof ExecutionException) { + if (e.getCause().getCause() instanceof IOException) { + throw (IOException) e.getCause().getCause(); + } + } + throw e; + } + } + + private void copyOneFile( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Queue queue, + Runnable onFileCompletion, + ActionListener listener + ) { + final String file = queue.poll(); + if (file == null) { + // Queue is empty, so notify listener we are done + listener.onResponse(null); + } else { + logger.trace("Downloading file {}", file); + try { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + } catch (Exception e) { + // Clear the queue to stop any future processing, report the failure, then return + queue.clear(); + listener.onFailure(e); + return; + } + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..e27a597007b62 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -144,6 +144,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -335,6 +336,7 @@ public class IndicesService extends AbstractLifecycleComponent private final CountDownLatch closeLatch = new CountDownLatch(1); private volatile boolean idFieldDataEnabled; private volatile boolean allowExpensiveQueries; + private final RecoverySettings recoverySettings; @Nullable private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor; @@ -380,7 +382,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, - @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings ) { this.settings = settings; this.threadPool = threadPool; @@ -477,6 +480,7 @@ protected void closeInternal() { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval); + this.recoverySettings = recoverySettings; } /** @@ -874,7 +878,8 @@ private synchronized IndexService createIndexService( remoteDirectoryFactory, translogFactorySupplier, this::getClusterDefaultRefreshInterval, - this::getClusterRemoteTranslogBufferInterval + this::getClusterRemoteTranslogBufferInterval, + this.recoverySettings ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index ddbcb86269aa9..d2000a56401f5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -14,20 +14,16 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,49 +105,31 @@ public void getSegmentFiles( logger.debug("Downloading segment files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); - final ShardPath shardPath = indexShard.shardPath(); + final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegments.add(fileMetadata); + toDownloadSegmentNames.add(file); } - final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker(); - downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener); - logger.debug("Downloaded segment files from remote store {}", toDownloadSegments); + indexShard.getFileDownloader().download(remoteDirectory, storeDirectory, toDownloadSegmentNames); + logger.debug("Downloaded segment files from remote store {}", filesToFetch); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } catch (Exception e) { listener.onFailure(e); } } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory remoteStoreDirectory, - List toDownloadSegments, - ShardPath shardPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener completionListener - ) { - final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); - for (StoreFileMetadata storeFileMetadata : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - } - completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); - } - @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f44c8c8bea7f1..5b3b064a47c66 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -771,8 +771,7 @@ protected Node( final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( repositoriesServiceReference::get, - threadPool, - recoverySettings + threadPool ); final SearchRequestStats searchRequestStats = new SearchRequestStats(); @@ -803,7 +802,8 @@ protected Node( repositoriesServiceReference::get, fileCacheCleaner, searchRequestStats, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + recoverySettings ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf85e19493203..8e6649c366eeb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1144,8 +1144,7 @@ private void executeStaleShardDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, @@ -1615,8 +1614,7 @@ private void executeOneStaleIndexDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java deleted file mode 100644 index 1780819390052..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore; - -import org.opensearch.common.Randomness; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils; -import org.opensearch.common.crypto.CryptoHandler; -import org.opensearch.common.crypto.DecryptedRangedStreamProvider; -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.action.ActionListener; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.UnaryOperator; - -import org.mockito.Mockito; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class AsyncMultiStreamEncryptedBlobContainerTests extends OpenSearchTestCase { - - // Tests the happy path scenario for decrypting a read context - @SuppressWarnings("unchecked") - public void testReadBlobAsync() throws Exception { - String testBlobName = "testBlobName"; - int size = 100; - - // Mock objects needed for the test - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - CryptoHandler cryptoHandler = mock(CryptoHandler.class); - Object cryptoContext = mock(Object.class); - when(cryptoHandler.loadEncryptionMetadata(any())).thenReturn(cryptoContext); - when(cryptoHandler.estimateDecryptedLength(any(), anyLong())).thenReturn((long) size); - long[] adjustedRanges = { 0, size - 1 }; - DecryptedRangedStreamProvider rangedStreamProvider = new DecryptedRangedStreamProvider(adjustedRanges, UnaryOperator.identity()); - when(cryptoHandler.createDecryptingStreamOfRange(eq(cryptoContext), anyLong(), anyLong())).thenReturn(rangedStreamProvider); - - // Objects needed for API call - final byte[] data = new byte[size]; - Randomness.get().nextBytes(data); - - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); - final ListenerTestUtils.CountingCompletionListener completionListener = - new ListenerTestUtils.CountingCompletionListener<>(); - final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); - - Mockito.doAnswer(invocation -> { - ActionListener readContextActionListener = invocation.getArgument(1); - readContextActionListener.onResponse(readContext); - return null; - }).when(blobContainer).readBlobAsync(eq(testBlobName), any()); - - AsyncMultiStreamEncryptedBlobContainer asyncMultiStreamEncryptedBlobContainer = - new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler); - asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener); - - // Assert results - ReadContext response = completionListener.getResponse(); - assertEquals(0, completionListener.getFailureCount()); - assertEquals(1, completionListener.getResponseCount()); - assertNull(completionListener.getException()); - - assertTrue(response instanceof AsyncMultiStreamEncryptedBlobContainer.DecryptedReadContext); - assertEquals(1, response.getNumberOfParts()); - assertEquals(size, response.getBlobSize()); - - InputStreamContainer responseContainer = response.getPartStreams().get(0).get().join(); - assertEquals(0, responseContainer.getOffset()); - assertEquals(size, responseContainer.getContentLength()); - assertEquals(100, responseContainer.getInputStream().available()); - } - - // Tests the exception scenario for decrypting a read context - @SuppressWarnings("unchecked") - public void testReadBlobAsyncException() throws Exception { - String testBlobName = "testBlobName"; - int size = 100; - - // Mock objects needed for the test - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - CryptoHandler cryptoHandler = mock(CryptoHandler.class); - when(cryptoHandler.loadEncryptionMetadata(any())).thenThrow(new IOException()); - - // Objects needed for API call - final byte[] data = new byte[size]; - Randomness.get().nextBytes(data); - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); - final ListenerTestUtils.CountingCompletionListener completionListener = - new ListenerTestUtils.CountingCompletionListener<>(); - final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); - - Mockito.doAnswer(invocation -> { - ActionListener readContextActionListener = invocation.getArgument(1); - readContextActionListener.onResponse(readContext); - return null; - }).when(blobContainer).readBlobAsync(eq(testBlobName), any()); - - AsyncMultiStreamEncryptedBlobContainer asyncMultiStreamEncryptedBlobContainer = - new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler); - asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener); - - // Assert results - assertEquals(1, completionListener.getFailureCount()); - assertEquals(0, completionListener.getResponseCount()); - assertNull(completionListener.getResponse()); - assertTrue(completionListener.getException() instanceof IOException); - } - -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java deleted file mode 100644 index f2a758b9bbe10..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.test.OpenSearchTestCase; -import org.junit.Before; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.UUID; -import java.util.function.UnaryOperator; - -public class FilePartWriterTests extends OpenSearchTestCase { - - private Path path; - - @Before - public void init() throws Exception { - path = createTempDir("FilePartWriterTests"); - } - - public void testFilePartWriter() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 100; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength, Files.size(segmentFilePath)); - } - - public void testFilePartWriterWithOffset() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 100; - int offset = 10; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength + offset, Files.size(segmentFilePath)); - } - - public void testFilePartWriterLargeInput() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 20 * 1024 * 1024; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength, Files.size(segmentFilePath)); - } -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java deleted file mode 100644 index a3a32f6db2148..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.core.action.ActionListener; - -/** - * Utility class containing common functionality for read listener based tests - */ -public class ListenerTestUtils { - - /** - * CountingCompletionListener acts as a verification instance for wrapping listener based calls. - * Keeps track of the last response, failure and count of response and failure invocations. - */ - public static class CountingCompletionListener implements ActionListener { - private int responseCount; - private int failureCount; - private T response; - private Exception exception; - - @Override - public void onResponse(T response) { - this.response = response; - responseCount++; - } - - @Override - public void onFailure(Exception e) { - exception = e; - failureCount++; - } - - public int getResponseCount() { - return responseCount; - } - - public int getFailureCount() { - return failureCount; - } - - public T getResponse() { - return response; - } - - public Exception getException() { - return exception; - } - } -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java deleted file mode 100644 index 0163c2275e7f4..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.apache.lucene.tests.util.LuceneTestCase.SuppressFileSystems; -import org.opensearch.action.LatchedActionListener; -import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.action.ActionListener; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.function.UnaryOperator; - -import static org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils.CountingCompletionListener; - -/* - WindowsFS tries to simulate file handles in a best case simulation. - The deletion for the open file on an actual Windows system will be performed as soon as the last handle - is closed, which this simulation does not account for. Preventing use of WindowsFS for these tests. - */ -@SuppressFileSystems("WindowsFS") -public class ReadContextListenerTests extends OpenSearchTestCase { - - private Path path; - private static ThreadPool threadPool; - private static final int NUMBER_OF_PARTS = 5; - private static final int PART_SIZE = 10; - private static final String TEST_SEGMENT_FILE = "test_segment_file"; - private static final int MAX_CONCURRENT_STREAMS = 10; - - @BeforeClass - public static void setup() { - threadPool = new TestThreadPool(ReadContextListenerTests.class.getName()); - } - - @AfterClass - public static void cleanup() { - threadPool.shutdown(); - } - - @Before - public void init() throws Exception { - path = createTempDir("ReadContextListenerTests"); - } - - public void testReadContextListener() throws InterruptedException, IOException { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - - assertTrue(Files.exists(fileLocation)); - assertEquals(NUMBER_OF_PARTS * PART_SIZE, Files.size(fileLocation)); - } - - public void testReadContextListenerFailure() throws Exception { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - InputStream badInputStream = new InputStream() { - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return read(); - } - - @Override - public int read() throws IOException { - throw new IOException(); - } - - @Override - public int available() { - return PART_SIZE; - } - }; - - blobPartStreams.add( - NUMBER_OF_PARTS, - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), - threadPool.generic() - ) - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertFalse(Files.exists(fileLocation)); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - public void testReadContextListenerException() { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - CountingCompletionListener listener = new CountingCompletionListener(); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - listener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - IOException exception = new IOException(); - readContextListener.onFailure(exception); - assertEquals(1, listener.getFailureCount()); - assertEquals(exception, listener.getException()); - } - - public void testWriteToTempFile() throws Exception { - final String fileName = UUID.randomUUID().toString(); - Path fileLocation = path.resolve(fileName); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ByteArrayInputStream assertingStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)) { - @Override - public int read(byte[] b) throws IOException { - assertTrue("parts written to temp file location", Files.exists(readContextListener.getTmpFileLocation())); - return super.read(b); - } - }; - blobPartStreams.add( - NUMBER_OF_PARTS, - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), - threadPool.generic() - ) - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertTrue(Files.exists(fileLocation)); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { - final String fileName = UUID.randomUUID().toString(); - Path fileLocation = path.resolve(fileName); - // create an empty file at location. - Files.createFile(fileLocation); - assertEquals(0, Files.readAllBytes(fileLocation).length); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertTrue(Files.exists(fileLocation)); - assertEquals(50, Files.readAllBytes(fileLocation).length); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - private List initializeBlobPartStreams() { - List blobPartStreams = new ArrayList<>(); - for (int partNumber = 0; partNumber < NUMBER_OF_PARTS; partNumber++) { - InputStream testStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)); - int finalPartNumber = partNumber; - blobPartStreams.add( - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE), - threadPool.generic() - ) - ); - } - return blobPartStreams; - } -} diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bbd73bcf97aab..97bc822be7d51 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -258,10 +258,11 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + DefaultRecoverySettings.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 941f2f48e71af..5a13f57db2c87 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,7 +33,6 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -156,8 +155,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { remoteMetadataDirectory, mock(RemoteStoreLockManager.class), mock(ThreadPool.class), - shardId, - DefaultRecoverySettings.INSTANCE + shardId ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 78c7fe64cebd9..cad5e47531cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -20,7 +20,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -58,11 +57,7 @@ public void setup() { repositoriesService = mock(RepositoriesService.class); threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceSupplier, - threadPool, - DefaultRecoverySettings.INSTANCE - ); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 0f44d5c3b2f53..ca0a675d00b79 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -25,9 +25,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -42,7 +40,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -51,18 +48,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.UnaryOperator; import org.mockito.Mockito; @@ -71,8 +65,6 @@ import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -150,14 +142,12 @@ public void setup() throws IOException { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } - when(remoteDataDirectory.getDownloadRateLimiter()).thenReturn(UnaryOperator.identity()); when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); } @@ -567,118 +557,6 @@ public void onFailure(Exception e) {} storeDirectory.close(); } - public void testCopyFilesToMultipart() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - Mockito.doAnswer(invocation -> { - ActionListener completionListener = invocation.getArgument(1); - final CompletableFuture future = new CompletableFuture<>(); - future.complete(new InputStreamContainer(new ByteArrayInputStream(new byte[] { 42 }), 0, 1)); - completionListener.onResponse(new ReadContext(1, List.of(() -> future), "")); - return null; - }).when(blobContainer).readBlobAsync(any(), any()); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker(); - long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); - verify(blobContainer, times(1)).readBlobAsync(contains(filename), any()); - verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); - - // Verify stats are updated to DirectoryFileTransferTracker - assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded()); - } - - public void testCopyFilesTo() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToEmptyPath() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToException() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - Mockito.doThrow(new IOException()) - .when(storeDirectory) - .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - - } - - @Override - public void onFailure(Exception e) { - downloadLatch.countDown(); - } - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -688,8 +566,7 @@ public void testCopyFilesFromMultipartIOException() throws Exception { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java new file mode 100644 index 0000000000000..588d9e8bb13a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private Directory source; + private Directory destination; + private Directory secondDestination; + private RemoteStoreFileDownloader fileDownloader; + private Map files = new HashMap<>(); + + @Before + public void setup() throws IOException { + final int streamLimit = randomIntBetween(1, 20); + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put("indices.recovery.max_concurrent_remote_store_streams", streamLimit).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + threadPool = new TestThreadPool(getTestName()); + source = new NIOFSDirectory(createTempDir()); + destination = new NIOFSDirectory(createTempDir()); + secondDestination = new NIOFSDirectory(createTempDir()); + for (int i = 0; i < 10; i++) { + final String filename = "file_" + i; + final int content = randomInt(); + try (IndexOutput output = source.createOutput(filename, IOContext.DEFAULT)) { + output.writeInt(content); + } + files.put(filename, content); + } + fileDownloader = new RemoteStoreFileDownloader( + ShardId.fromString("[RemoteStoreFileDownloaderTests][0]"), + threadPool, + recoverySettings + ); + } + + @After + public void stopThreadPool() throws Exception { + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(5, TimeUnit.SECONDS)); + } + + public void testDownload() throws IOException { + fileDownloader.download(source, destination, files.keySet()); + assertContent(files, destination); + } + + public void testDownloadWithSecondDestination() throws IOException { + fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); + assertContent(files, destination); + assertContent(files, secondDestination); + } + + public void testDownloadWithFileCompletionHandler() throws IOException { + final AtomicInteger counter = new AtomicInteger(0); + fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); + assertContent(files, destination); + assertEquals(files.size(), counter.get()); + } + + public void testDownloadNonExistentFile() { + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + } + + public void testDownloadExtraNonExistentFile() { + List filesWithExtra = new ArrayList<>(files.keySet()); + filesWithExtra.add("not real"); + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + } + + private static void assertContent(Map expected, Directory destination) throws IOException { + // Note that Lucene will randomly write extra files (see org.apache.lucene.tests.mockfile.ExtraFS) + // so we just need to check that all the expected files are present but not that _only_ the expected + // files are present + final Set actualFiles = Set.of(destination.listAll()); + for (String file : expected.keySet()) { + assertTrue(actualFiles.contains(file)); + try (IndexInput input = destination.openInput(file, IOContext.DEFAULT)) { + assertEquals(expected.get(file), Integer.valueOf(input.readInt())); + assertThrows(EOFException.class, input::readByte); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 03f0d27188027..a831a01da620d 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -41,7 +41,6 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; @@ -271,11 +270,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp completionListener.onResponse(null); } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - throw new RuntimeException("read not supported"); - } - @Override public boolean remoteIntegrityCheckSupported() { return false; diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 80731b378f369..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2067,11 +2067,12 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, null, - new RemoteStoreStatsTrackerFactory(clusterService, settings) + new RemoteStoreStatsTrackerFactory(clusterService, settings), + DefaultRecoverySettings.INSTANCE ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 186c1c7e78f6b..ffa37817e8a03 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -702,7 +702,7 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - null + DefaultRecoverySettings.INSTANCE ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -789,14 +789,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory( - dataDirectory, - metadataDirectory, - remoteStoreLockManager, - threadPool, - shardId, - DefaultRecoverySettings.INSTANCE - ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {