diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 9ffdba5eaae3a..c62d358d640c0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -70,14 +70,12 @@ import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; -import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.AbstractBlobContainer; @@ -222,52 +220,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } - @ExperimentalApi - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { - final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client(); - final String bucketName = blobStore.bucket(); - final String blobKey = buildKey(blobName); - - final CompletableFuture blobMetadataFuture = getBlobMetadata(s3AsyncClient, bucketName, blobKey); - - blobMetadataFuture.whenComplete((blobMetadata, throwable) -> { - if (throwable != null) { - Exception ex = throwable.getCause() instanceof Exception - ? (Exception) throwable.getCause() - : new Exception(throwable.getCause()); - listener.onFailure(ex); - return; - } - - try { - final List blobPartInputStreamFutures = new ArrayList<>(); - final long blobSize = blobMetadata.objectSize(); - final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount(); - final String blobChecksum = blobMetadata.checksum() == null ? null : blobMetadata.checksum().checksumCRC32(); - - if (numberOfParts == null) { - blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null)); - } else { - // S3 multipart files use 1 to n indexing - for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { - final int innerPartNumber = partNumber; - blobPartInputStreamFutures.add( - () -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber) - ); - } - } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); - } catch (Exception ex) { - listener.onFailure(ex); - } - }); - } catch (Exception ex) { - listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex)); - } - } - public boolean remoteIntegrityCheckSupported() { return true; } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index e266bba372d80..1b16ea9e2b025 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -70,13 +70,11 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; -import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; @@ -98,7 +96,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -917,208 +914,6 @@ public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberO testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } - public void testReadBlobAsyncMultiPart() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - final int partSize = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize(objectSize) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - mockObjectPartResponse(s3AsyncClient, bucketName, blobName, objectPartCount, partSize, objectSize); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(1, readContextActionListener.getResponseCount()); - assertEquals(0, readContextActionListener.getFailureCount()); - ReadContext readContext = readContextActionListener.getResponse(); - assertEquals(objectPartCount, readContext.getNumberOfParts()); - assertEquals(checksum, readContext.getBlobChecksum()); - assertEquals(objectSize, readContext.getBlobSize()); - - for (int partNumber = 1; partNumber < objectPartCount; partNumber++) { - InputStreamContainer inputStreamContainer = readContext.getPartStreams().get(partNumber).get().join(); - final int offset = partNumber * partSize; - assertEquals(partSize, inputStreamContainer.getContentLength()); - assertEquals(offset, inputStreamContainer.getOffset()); - assertEquals(partSize, inputStreamContainer.getInputStream().readAllBytes().length); - } - } - - public void testReadBlobAsyncSinglePart() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final int objectSize = 100; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize((long) objectSize) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - mockObjectResponse(s3AsyncClient, bucketName, blobName, objectSize); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(1, readContextActionListener.getResponseCount()); - assertEquals(0, readContextActionListener.getFailureCount()); - ReadContext readContext = readContextActionListener.getResponse(); - assertEquals(1, readContext.getNumberOfParts()); - assertEquals(checksum, readContext.getBlobChecksum()); - assertEquals(objectSize, readContext.getBlobSize()); - - InputStreamContainer inputStreamContainer = readContext.getPartStreams().stream().findFirst().get().get().join(); - assertEquals(objectSize, inputStreamContainer.getContentLength()); - assertEquals(0, inputStreamContainer.getOffset()); - assertEquals(objectSize, inputStreamContainer.getInputStream().readAllBytes().length); - - } - - public void testReadBlobAsyncFailure() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize(objectSize) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenThrow(new RuntimeException()); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(0, readContextActionListener.getResponseCount()); - assertEquals(1, readContextActionListener.getFailureCount()); - } - - public void testReadBlobAsyncOnCompleteFailureMissingData() throws Exception { - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String checksum = randomAlphaOfLength(10); - - final long objectSize = 100L; - final int objectPartCount = 10; - - final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); - final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) - ); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = new BlobPath(); - - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.serverSideEncryption()).thenReturn(false); - when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().build()) - .objectSize(null) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - CountDownLatch countDownLatch = new CountDownLatch(1); - CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); - LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); - - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, listener); - countDownLatch.await(); - - assertEquals(0, readContextActionListener.getResponseCount()); - assertEquals(1, readContextActionListener.getFailureCount()); - } - public void testGetBlobMetadata() throws Exception { final String checksum = randomAlphaOfLengthBetween(1, 10); final long objectSize = 100L; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..d30d43d6fa604 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -14,7 +14,6 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; @@ -25,9 +24,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -118,27 +114,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - new Thread(() -> { - try { - long contentLength = listBlobs().get(blobName).length(); - long partSize = contentLength / 10; - int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); - List blobPartStreams = new ArrayList<>(); - for (int partNumber = 0; partNumber < numberOfParts; partNumber++) { - long offset = partNumber * partSize; - InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); - blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); - } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); - listener.onResponse(blobReadContext); - } catch (Exception e) { - listener.onFailure(e); - } - }).start(); - } - public boolean remoteIntegrityCheckSupported() { return true; } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index 97f304d776f5c..345769f2770aa 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -8,8 +8,6 @@ package org.opensearch.common.blobstore; -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.core.action.ActionListener; @@ -34,14 +32,6 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; - /** - * Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container. - * @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched. - * @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob - */ - @ExperimentalApi - void readBlobAsync(String blobName, ActionListener listener); - /* * Wether underlying blobContainer can verify integrity of data after transfer. If true and if expected * checksum is provided in WriteContext, then the checksum of transferred data is compared with expected checksum diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..4157bf5178360 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -9,17 +9,12 @@ package org.opensearch.common.blobstore; import org.opensearch.common.StreamContext; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.crypto.CryptoHandler; -import org.opensearch.common.crypto.DecryptedRangedStreamProvider; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.stream.Collectors; /** * EncryptedBlobContainer is an encrypted BlobContainer that is backed by a @@ -44,21 +39,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobContainer.asyncBlobUpload(encryptedWriteContext, completionListener); } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - try { - final U cryptoContext = cryptoHandler.loadEncryptionMetadata(getEncryptedHeaderContentSupplier(blobName)); - ActionListener decryptingCompletionListener = ActionListener.map( - listener, - readContext -> new DecryptedReadContext<>(readContext, cryptoHandler, cryptoContext) - ); - - blobContainer.readBlobAsync(blobName, decryptingCompletionListener); - } catch (Exception e) { - listener.onFailure(e); - } - } - @Override public boolean remoteIntegrityCheckSupported() { return false; @@ -115,60 +95,4 @@ public InputStreamContainer provideStream(int partNumber) throws IOException { } } - - /** - * DecryptedReadContext decrypts the encrypted {@link ReadContext} by acting as a transformation wrapper around - * the encrypted object - * @param Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance - * @param Parsed Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance - */ - static class DecryptedReadContext extends ReadContext { - - private final CryptoHandler cryptoHandler; - private final U cryptoContext; - private Long blobSize; - - public DecryptedReadContext(ReadContext readContext, CryptoHandler cryptoHandler, U cryptoContext) { - super(readContext); - this.cryptoHandler = cryptoHandler; - this.cryptoContext = cryptoContext; - } - - @Override - public long getBlobSize() { - // initializes the value lazily - if (blobSize == null) { - this.blobSize = this.cryptoHandler.estimateDecryptedLength(cryptoContext, super.getBlobSize()); - } - return this.blobSize; - } - - @Override - public List getPartStreams() { - return super.getPartStreams().stream() - .map(supplier -> (StreamPartCreator) () -> supplier.get().thenApply(this::decryptInputStreamContainer)) - .collect(Collectors.toUnmodifiableList()); - } - - /** - * Transforms an encrypted {@link InputStreamContainer} to a decrypted instance - * @param inputStreamContainer encrypted input stream container instance - * @return decrypted input stream container instance - */ - private InputStreamContainer decryptInputStreamContainer(InputStreamContainer inputStreamContainer) { - long startOfStream = inputStreamContainer.getOffset(); - long endOfStream = startOfStream + inputStreamContainer.getContentLength() - 1; - DecryptedRangedStreamProvider decryptedStreamProvider = cryptoHandler.createDecryptingStreamOfRange( - cryptoContext, - startOfStream, - endOfStream - ); - - long adjustedPos = decryptedStreamProvider.getAdjustedRange()[0]; - long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1; - final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider() - .apply(inputStreamContainer.getInputStream()); - return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos); - } - } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java deleted file mode 100644 index 4bdce11ff4f9a..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read; - -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.io.InputStreamContainer; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -/** - * ReadContext is used to encapsulate all data needed by BlobContainer#readBlobAsync - */ -@ExperimentalApi -public class ReadContext { - private final long blobSize; - private final List asyncPartStreams; - private final String blobChecksum; - - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { - this.blobSize = blobSize; - this.asyncPartStreams = asyncPartStreams; - this.blobChecksum = blobChecksum; - } - - public ReadContext(ReadContext readContext) { - this.blobSize = readContext.blobSize; - this.asyncPartStreams = readContext.asyncPartStreams; - this.blobChecksum = readContext.blobChecksum; - } - - public String getBlobChecksum() { - return blobChecksum; - } - - public int getNumberOfParts() { - return asyncPartStreams.size(); - } - - public long getBlobSize() { - return blobSize; - } - - public List getPartStreams() { - return asyncPartStreams; - } - - /** - * Functional interface defining an instance that can create an async action - * to create a part of an object represented as an InputStreamContainer. - */ - @FunctionalInterface - public interface StreamPartCreator extends Supplier> { - /** - * Kicks off a async process to start streaming. - * - * @return When the returned future is completed, streaming has - * just begun. Clients must fully consume the resulting stream. - */ - @Override - CompletableFuture get(); - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java deleted file mode 100644 index 1a403200249cd..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.common.annotation.InternalApi; -import org.opensearch.common.io.Channels; -import org.opensearch.common.io.InputStreamContainer; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.function.UnaryOperator; - -/** - * FilePartWriter transfers the provided stream into the specified file path using a {@link FileChannel} - * instance. - */ -@InternalApi -class FilePartWriter { - // 8 MB buffer for transfer - private static final int BUFFER_SIZE = 8 * 1024 * 2024; - - public static void write(Path fileLocation, InputStreamContainer stream, UnaryOperator rateLimiter) throws IOException { - try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - try (InputStream inputStream = rateLimiter.apply(stream.getInputStream())) { - long streamOffset = stream.getOffset(); - final byte[] buffer = new byte[BUFFER_SIZE]; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - Channels.writeToChannel(buffer, 0, bytesRead, outputFileChannel, streamOffset); - streamOffset += bytesRead; - } - } - } - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java deleted file mode 100644 index c77f2384ace0d..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.IOUtils; -import org.opensearch.action.support.GroupedActionListener; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.UUIDs; -import org.opensearch.common.annotation.InternalApi; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.core.action.ActionListener; -import org.opensearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.Collection; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.UnaryOperator; - -/** - * ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer} - * using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams. - */ -@InternalApi -public class ReadContextListener implements ActionListener { - private static final Logger logger = LogManager.getLogger(ReadContextListener.class); - private static final String DOWNLOAD_PREFIX = "download."; - private final String blobName; - private final Path fileLocation; - private final String tmpFileName; - private final Path tmpFileLocation; - private final ActionListener completionListener; - private final ThreadPool threadPool; - private final UnaryOperator rateLimiter; - private final int maxConcurrentStreams; - - public ReadContextListener( - String blobName, - Path fileLocation, - ActionListener completionListener, - ThreadPool threadPool, - UnaryOperator rateLimiter, - int maxConcurrentStreams - ) { - this.blobName = blobName; - this.fileLocation = fileLocation; - this.completionListener = completionListener; - this.threadPool = threadPool; - this.rateLimiter = rateLimiter; - this.maxConcurrentStreams = maxConcurrentStreams; - this.tmpFileName = DOWNLOAD_PREFIX + UUIDs.randomBase64UUID() + "." + blobName; - this.tmpFileLocation = fileLocation.getParent().resolve(tmpFileName); - } - - @Override - public void onResponse(ReadContext readContext) { - logger.debug("Received {} parts for blob {}", readContext.getNumberOfParts(), blobName); - final int numParts = readContext.getNumberOfParts(); - final AtomicBoolean anyPartStreamFailed = new AtomicBoolean(false); - final GroupedActionListener groupedListener = new GroupedActionListener<>(getFileCompletionListener(), numParts); - final Queue queue = new ConcurrentLinkedQueue<>(readContext.getPartStreams()); - final StreamPartProcessor processor = new StreamPartProcessor( - queue, - anyPartStreamFailed, - tmpFileLocation, - groupedListener, - threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY), - rateLimiter - ); - for (int i = 0; i < Math.min(maxConcurrentStreams, queue.size()); i++) { - processor.process(queue.poll()); - } - } - - @SuppressForbidden(reason = "need to fsync once all parts received") - private ActionListener> getFileCompletionListener() { - return ActionListener.wrap(response -> { - logger.trace("renaming temp file [{}] to [{}]", tmpFileLocation, fileLocation); - try { - IOUtils.fsync(tmpFileLocation, false); - Files.move(tmpFileLocation, fileLocation, StandardCopyOption.ATOMIC_MOVE); - // sync parent dir metadata - IOUtils.fsync(fileLocation.getParent(), true); - completionListener.onResponse(blobName); - } catch (IOException e) { - logger.error("Unable to rename temp file + " + tmpFileLocation, e); - completionListener.onFailure(e); - } - }, e -> { - try { - Files.deleteIfExists(tmpFileLocation); - } catch (IOException ex) { - logger.warn("Unable to clean temp file {}", tmpFileLocation); - } - completionListener.onFailure(e); - }); - } - - /* - * For Tests - */ - Path getTmpFileLocation() { - return tmpFileLocation; - } - - @Override - public void onFailure(Exception e) { - completionListener.onFailure(e); - } - - private static class StreamPartProcessor { - private static final RuntimeException CANCELED_PART_EXCEPTION = new RuntimeException( - "Canceled part download due to previous failure" - ); - private final Queue queue; - private final AtomicBoolean anyPartStreamFailed; - private final Path fileLocation; - private final GroupedActionListener completionListener; - private final Executor executor; - private final UnaryOperator rateLimiter; - - private StreamPartProcessor( - Queue queue, - AtomicBoolean anyPartStreamFailed, - Path fileLocation, - GroupedActionListener completionListener, - Executor executor, - UnaryOperator rateLimiter - ) { - this.queue = queue; - this.anyPartStreamFailed = anyPartStreamFailed; - this.fileLocation = fileLocation; - this.completionListener = completionListener; - this.executor = executor; - this.rateLimiter = rateLimiter; - } - - private void process(ReadContext.StreamPartCreator supplier) { - if (supplier == null) { - return; - } - supplier.get().whenCompleteAsync((blobPartStreamContainer, throwable) -> { - if (throwable != null) { - processFailure(throwable instanceof Exception ? (Exception) throwable : new RuntimeException(throwable)); - } else if (anyPartStreamFailed.get()) { - processFailure(CANCELED_PART_EXCEPTION); - } else { - try { - FilePartWriter.write(fileLocation, blobPartStreamContainer, rateLimiter); - completionListener.onResponse(fileLocation.toString()); - - // Upon successfully completing a file part, pull another - // file part off the queue to trigger asynchronous processing - process(queue.poll()); - } catch (Exception e) { - processFailure(e); - } - } - }, executor); - } - - private void processFailure(Exception e) { - if (anyPartStreamFailed.getAndSet(true) == false) { - completionListener.onFailure(e); - - // Drain the queue of pending part downloads. These can be discarded - // since they haven't started any work yet, but the listener must be - // notified for each part. - Object item = queue.poll(); - while (item != null) { - completionListener.onFailure(CANCELED_PART_EXCEPTION); - item = queue.poll(); - } - } else { - completionListener.onFailure(e); - } - try { - Files.deleteIfExists(fileLocation); - } catch (IOException ex) { - // Die silently - logger.info("Failed to delete file {} on stream failure: {}", fileLocation, ex); - } - } - } -} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java deleted file mode 100644 index fe670fe3eb25c..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Abstractions for stream based file reads from the blob store. - * Provides listeners for performing the necessary async read operations to perform - * multi stream reads for blobs from the container. - * */ -package org.opensearch.common.blobstore.stream.read.listener; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java deleted file mode 100644 index a9e2ca35c1fa6..0000000000000 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Abstractions for stream based file reads from the blob store. - * Provides support for async reads from the blob container. - * */ -package org.opensearch.common.blobstore.stream.read; diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 8692876412ea9..e29283724ebf8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -81,6 +81,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; @@ -602,7 +603,8 @@ public IndexService newIndexService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -660,7 +662,8 @@ public IndexService newIndexService( recoveryStateFactory, translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, - clusterRemoteTranslogBufferIntervalSupplier + clusterRemoteTranslogBufferIntervalSupplier, + recoverySettings ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index af23145be9f89..a53dbe246fa44 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,13 +89,13 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; @@ -179,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BiFunction translogFactorySupplier; private final Supplier clusterDefaultRefreshIntervalSupplier; private final Supplier clusterRemoteTranslogBufferIntervalSupplier; + private final RecoverySettings recoverySettings; public IndexService( IndexSettings indexSettings, @@ -213,7 +214,8 @@ public IndexService( IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -290,6 +292,7 @@ public IndexService( this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); this.translogFactorySupplier = translogFactorySupplier; this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; + this.recoverySettings = recoverySettings; updateFsyncTaskIfNecessary(); } @@ -522,7 +525,7 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - (RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory + recoverySettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 251f9a5ae01c0..2643af501bbc9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,7 +62,6 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; @@ -160,9 +159,8 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.RemoteStoreFileDownloader; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -184,6 +182,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -201,7 +200,6 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -342,7 +340,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); - private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; + private final RemoteStoreFileDownloader fileDownloader; public IndexShard( final ShardRouting shardRouting, @@ -371,10 +369,7 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - // Wiring a directory factory here breaks some intended abstractions, but this remote directory - // factory is used not as a Lucene directory but instead to copy files from a remote store when - // restoring a shallow snapshot. - @Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + final RecoverySettings recoverySettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -470,7 +465,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; - this.remoteSegmentStoreDirectoryFactory = remoteSegmentStoreDirectoryFactory; + this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); } public ThreadPool getThreadPool() { @@ -568,6 +563,10 @@ public String getNodeId() { return translogConfig.getNodeId(); } + public RemoteStoreFileDownloader getFileDownloader() { + return fileDownloader; + } + @Override public void updateShardState( final ShardRouting newRouting, @@ -2706,7 +2705,7 @@ public void restoreFromRemoteStore(ActionListener listener) { public void restoreFromSnapshotAndRemoteStore( Repository repository, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + RepositoriesService repositoriesService, ActionListener listener ) { try { @@ -2714,7 +2713,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, remoteSegmentStoreDirectoryFactory, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } @@ -3554,7 +3553,7 @@ public void startRecovery( "from snapshot and remote store", recoveryState, recoveryListener, - l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), remoteSegmentStoreDirectoryFactory, l) + l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), repositoriesService, l) ); // indicesService.indexService(shardRouting.shardId().getIndex()).addMetadataListener(); } else { @@ -4912,7 +4911,7 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); + fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } @@ -4925,26 +4924,6 @@ private String copySegmentFiles( return segmentNFile; } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory sourceRemoteDirectory, - RemoteSegmentStoreDirectory targetRemoteDirectory, - Set toDownloadSegments, - final Runnable onFileSync - ) throws IOException { - final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); - final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker(); - for (String segment : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - onFileSync.run(); - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, segment, segment, IOContext.DEFAULT); - } - } - } - private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 762aab51469d0..c0211e1257c8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -70,7 +70,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -360,8 +362,9 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, - RemoteSegmentStoreDirectoryFactory directoryFactory, - ActionListener listener + RepositoriesService repositoriesService, + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -389,6 +392,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index eb75c39532d71..36d7522564e4f 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -333,10 +333,6 @@ public boolean copyFrom( return false; } - protected UnaryOperator getDownloadRateLimiter() { - return downloadRateLimiter; - } - private void uploadBlob( Directory from, String src, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index dc9706306b408..7428d0a19a8b0 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,8 +24,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -38,7 +36,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -46,7 +43,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -93,8 +89,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -127,15 +121,13 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - ShardId shardId, - RecoverySettings recoverySettings + ShardId shardId ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; this.logger = Loggers.getLogger(getClass(), shardId); init(); } @@ -472,70 +464,6 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } - /** - * Copies an existing {@code source} file from this directory to a non-existent file (also - * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. - * If the blob container backing this directory supports multipart downloads, the {@code source} - * file will be downloaded (potentially in multiple concurrent parts) directly to - * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} - * will be notified upon completion. - *

- * If multipart downloads are not supported, then {@code source} file will be copied to a file named - * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the - * calling thread and {@code fileCompletionListener} will be notified synchronously before this - * method returns. - * - * @param source The source file name - * @param destinationDirectory The destination directory (if multipart is not supported) - * @param destinationPath The destination path (if multipart is supported) - * @param fileTransferTracker Tracker used for file transfer stats - * @param fileCompletionListener The listener to notify of completion - */ - public void copyTo( - String source, - Directory destinationDirectory, - Path destinationPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener fileCompletionListener - ) { - final String blobName = getExistingRemoteFilename(source); - if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - long length = 0L; - try { - length = fileLength(source); - } catch (IOException ex) { - logger.error("Unable to fetch segment length for stats tracking", ex); - } - final long fileLength = length; - final long startTime = System.currentTimeMillis(); - fileTransferTracker.addTransferredBytesStarted(fileLength); - final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); - final Path destinationFilePath = destinationPath.resolve(source); - final ActionListener completionListener = ActionListener.wrap(response -> { - fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime); - fileCompletionListener.onResponse(response); - }, e -> { - fileTransferTracker.addTransferredBytesFailed(fileLength, startTime); - fileCompletionListener.onFailure(e); - }); - final ReadContextListener readContextListener = new ReadContextListener( - blobName, - destinationFilePath, - completionListener, - threadPool, - remoteDataDirectory.getDownloadRateLimiter(), - recoverySettings.getMaxConcurrentRemoteStoreStreams() - ); - blobContainer.readBlobAsync(blobName, readContextListener); - } else { - // Fallback to older mechanism of downloading the file - ActionListener.completeWith(fileCompletionListener, () -> { - destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); - return source; - }); - } - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cc55380894ecd..a5e89ec6a8327 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -15,7 +15,6 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -35,18 +34,12 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private static final String SEGMENTS = "segments"; private final Supplier repositoriesService; - private final RecoverySettings recoverySettings; private final ThreadPool threadPool; - public RemoteSegmentStoreDirectoryFactory( - Supplier repositoriesService, - ThreadPool threadPool, - RecoverySettings recoverySettings - ) { + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; } @Override @@ -78,7 +71,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, recoverySettings); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java new file mode 100644 index 0000000000000..36e89707b9716 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; + +/** + * Helper class to downloads files from a {@link RemoteSegmentStoreDirectory} + * instance to a local {@link Directory} instance in parallel depending on thread + * pool size and recovery settings. + */ +@InternalApi +public final class RemoteStoreFileDownloader { + private final Logger logger; + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) { + this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId); + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param toDownloadSegments The list of segment files to download + */ + public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { + downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory, while also copying the segments _to_ another remote directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param secondDestination The second remote directory that segment files are + * copied to after being copied to the local directory + * @param toDownloadSegments The list of segment files to download + * @param onFileCompletion A generic runnable that is invoked after each file download. + * Must be thread safe as this may be invoked concurrently from + * different threads. + */ + public void download( + Directory source, + Directory destination, + Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + } + + private void downloadInternal( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); + // Choose the minimum of: + // - number of files to download + // - max thread pool size + // - "indices.recovery.max_concurrent_remote_store_streams" setting + final int threads = Math.min( + toDownloadSegments.size(), + Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) + ); + logger.trace("Starting download of {} files with {} threads", queue.size(), threads); + final PlainActionFuture> listener = PlainActionFuture.newFuture(); + final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + for (int i = 0; i < threads; i++) { + threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY) + .submit(() -> copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener)); + } + try { + listener.actionGet(); + } catch (UncategorizedExecutionException e) { + // Any IOException will be double-wrapped so dig it out and throw it + if (e.getCause() instanceof ExecutionException) { + if (e.getCause().getCause() instanceof IOException) { + throw (IOException) e.getCause().getCause(); + } + } + throw e; + } + } + + private void copyOneFile( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Queue queue, + Runnable onFileCompletion, + ActionListener listener + ) { + final String file = queue.poll(); + if (file == null) { + // Queue is empty, so notify listener we are done + listener.onResponse(null); + } else { + logger.trace("Downloading file {}", file); + try { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + } catch (Exception e) { + // Clear the queue to stop any future processing, report the failure, then return + queue.clear(); + listener.onFailure(e); + return; + } + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..e27a597007b62 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -144,6 +144,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -335,6 +336,7 @@ public class IndicesService extends AbstractLifecycleComponent private final CountDownLatch closeLatch = new CountDownLatch(1); private volatile boolean idFieldDataEnabled; private volatile boolean allowExpensiveQueries; + private final RecoverySettings recoverySettings; @Nullable private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor; @@ -380,7 +382,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, - @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings ) { this.settings = settings; this.threadPool = threadPool; @@ -477,6 +480,7 @@ protected void closeInternal() { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval); + this.recoverySettings = recoverySettings; } /** @@ -874,7 +878,8 @@ private synchronized IndexService createIndexService( remoteDirectoryFactory, translogFactorySupplier, this::getClusterDefaultRefreshInterval, - this::getClusterRemoteTranslogBufferInterval + this::getClusterRemoteTranslogBufferInterval, + this.recoverySettings ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index ddbcb86269aa9..d2000a56401f5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -14,20 +14,16 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,49 +105,31 @@ public void getSegmentFiles( logger.debug("Downloading segment files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); - final ShardPath shardPath = indexShard.shardPath(); + final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegments.add(fileMetadata); + toDownloadSegmentNames.add(file); } - final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker(); - downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener); - logger.debug("Downloaded segment files from remote store {}", toDownloadSegments); + indexShard.getFileDownloader().download(remoteDirectory, storeDirectory, toDownloadSegmentNames); + logger.debug("Downloaded segment files from remote store {}", filesToFetch); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } catch (Exception e) { listener.onFailure(e); } } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory remoteStoreDirectory, - List toDownloadSegments, - ShardPath shardPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener completionListener - ) { - final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); - for (StoreFileMetadata storeFileMetadata : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - } - completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); - } - @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f44c8c8bea7f1..5b3b064a47c66 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -771,8 +771,7 @@ protected Node( final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( repositoriesServiceReference::get, - threadPool, - recoverySettings + threadPool ); final SearchRequestStats searchRequestStats = new SearchRequestStats(); @@ -803,7 +802,8 @@ protected Node( repositoriesServiceReference::get, fileCacheCleaner, searchRequestStats, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + recoverySettings ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf85e19493203..8e6649c366eeb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1144,8 +1144,7 @@ private void executeStaleShardDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, @@ -1615,8 +1614,7 @@ private void executeOneStaleIndexDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java deleted file mode 100644 index 1780819390052..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore; - -import org.opensearch.common.Randomness; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils; -import org.opensearch.common.crypto.CryptoHandler; -import org.opensearch.common.crypto.DecryptedRangedStreamProvider; -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.action.ActionListener; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.UnaryOperator; - -import org.mockito.Mockito; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class AsyncMultiStreamEncryptedBlobContainerTests extends OpenSearchTestCase { - - // Tests the happy path scenario for decrypting a read context - @SuppressWarnings("unchecked") - public void testReadBlobAsync() throws Exception { - String testBlobName = "testBlobName"; - int size = 100; - - // Mock objects needed for the test - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - CryptoHandler cryptoHandler = mock(CryptoHandler.class); - Object cryptoContext = mock(Object.class); - when(cryptoHandler.loadEncryptionMetadata(any())).thenReturn(cryptoContext); - when(cryptoHandler.estimateDecryptedLength(any(), anyLong())).thenReturn((long) size); - long[] adjustedRanges = { 0, size - 1 }; - DecryptedRangedStreamProvider rangedStreamProvider = new DecryptedRangedStreamProvider(adjustedRanges, UnaryOperator.identity()); - when(cryptoHandler.createDecryptingStreamOfRange(eq(cryptoContext), anyLong(), anyLong())).thenReturn(rangedStreamProvider); - - // Objects needed for API call - final byte[] data = new byte[size]; - Randomness.get().nextBytes(data); - - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); - final ListenerTestUtils.CountingCompletionListener completionListener = - new ListenerTestUtils.CountingCompletionListener<>(); - final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); - - Mockito.doAnswer(invocation -> { - ActionListener readContextActionListener = invocation.getArgument(1); - readContextActionListener.onResponse(readContext); - return null; - }).when(blobContainer).readBlobAsync(eq(testBlobName), any()); - - AsyncMultiStreamEncryptedBlobContainer asyncMultiStreamEncryptedBlobContainer = - new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler); - asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener); - - // Assert results - ReadContext response = completionListener.getResponse(); - assertEquals(0, completionListener.getFailureCount()); - assertEquals(1, completionListener.getResponseCount()); - assertNull(completionListener.getException()); - - assertTrue(response instanceof AsyncMultiStreamEncryptedBlobContainer.DecryptedReadContext); - assertEquals(1, response.getNumberOfParts()); - assertEquals(size, response.getBlobSize()); - - InputStreamContainer responseContainer = response.getPartStreams().get(0).get().join(); - assertEquals(0, responseContainer.getOffset()); - assertEquals(size, responseContainer.getContentLength()); - assertEquals(100, responseContainer.getInputStream().available()); - } - - // Tests the exception scenario for decrypting a read context - @SuppressWarnings("unchecked") - public void testReadBlobAsyncException() throws Exception { - String testBlobName = "testBlobName"; - int size = 100; - - // Mock objects needed for the test - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - CryptoHandler cryptoHandler = mock(CryptoHandler.class); - when(cryptoHandler.loadEncryptionMetadata(any())).thenThrow(new IOException()); - - // Objects needed for API call - final byte[] data = new byte[size]; - Randomness.get().nextBytes(data); - final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0); - final ListenerTestUtils.CountingCompletionListener completionListener = - new ListenerTestUtils.CountingCompletionListener<>(); - final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); - - Mockito.doAnswer(invocation -> { - ActionListener readContextActionListener = invocation.getArgument(1); - readContextActionListener.onResponse(readContext); - return null; - }).when(blobContainer).readBlobAsync(eq(testBlobName), any()); - - AsyncMultiStreamEncryptedBlobContainer asyncMultiStreamEncryptedBlobContainer = - new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler); - asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener); - - // Assert results - assertEquals(1, completionListener.getFailureCount()); - assertEquals(0, completionListener.getResponseCount()); - assertNull(completionListener.getResponse()); - assertTrue(completionListener.getException() instanceof IOException); - } - -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java deleted file mode 100644 index f2a758b9bbe10..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriterTests.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.test.OpenSearchTestCase; -import org.junit.Before; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.UUID; -import java.util.function.UnaryOperator; - -public class FilePartWriterTests extends OpenSearchTestCase { - - private Path path; - - @Before - public void init() throws Exception { - path = createTempDir("FilePartWriterTests"); - } - - public void testFilePartWriter() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 100; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength, Files.size(segmentFilePath)); - } - - public void testFilePartWriterWithOffset() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 100; - int offset = 10; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength + offset, Files.size(segmentFilePath)); - } - - public void testFilePartWriterLargeInput() throws Exception { - Path segmentFilePath = path.resolve(UUID.randomUUID().toString()); - int contentLength = 20 * 1024 * 1024; - InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength)); - InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0); - - FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity()); - - assertTrue(Files.exists(segmentFilePath)); - assertEquals(contentLength, Files.size(segmentFilePath)); - } -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java deleted file mode 100644 index a3a32f6db2148..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.opensearch.core.action.ActionListener; - -/** - * Utility class containing common functionality for read listener based tests - */ -public class ListenerTestUtils { - - /** - * CountingCompletionListener acts as a verification instance for wrapping listener based calls. - * Keeps track of the last response, failure and count of response and failure invocations. - */ - public static class CountingCompletionListener implements ActionListener { - private int responseCount; - private int failureCount; - private T response; - private Exception exception; - - @Override - public void onResponse(T response) { - this.response = response; - responseCount++; - } - - @Override - public void onFailure(Exception e) { - exception = e; - failureCount++; - } - - public int getResponseCount() { - return responseCount; - } - - public int getFailureCount() { - return failureCount; - } - - public T getResponse() { - return response; - } - - public Exception getException() { - return exception; - } - } -} diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java deleted file mode 100644 index 0163c2275e7f4..0000000000000 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.blobstore.stream.read.listener; - -import org.apache.lucene.tests.util.LuceneTestCase.SuppressFileSystems; -import org.opensearch.action.LatchedActionListener; -import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.common.blobstore.stream.read.ReadContext; -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.action.ActionListener; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.function.UnaryOperator; - -import static org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils.CountingCompletionListener; - -/* - WindowsFS tries to simulate file handles in a best case simulation. - The deletion for the open file on an actual Windows system will be performed as soon as the last handle - is closed, which this simulation does not account for. Preventing use of WindowsFS for these tests. - */ -@SuppressFileSystems("WindowsFS") -public class ReadContextListenerTests extends OpenSearchTestCase { - - private Path path; - private static ThreadPool threadPool; - private static final int NUMBER_OF_PARTS = 5; - private static final int PART_SIZE = 10; - private static final String TEST_SEGMENT_FILE = "test_segment_file"; - private static final int MAX_CONCURRENT_STREAMS = 10; - - @BeforeClass - public static void setup() { - threadPool = new TestThreadPool(ReadContextListenerTests.class.getName()); - } - - @AfterClass - public static void cleanup() { - threadPool.shutdown(); - } - - @Before - public void init() throws Exception { - path = createTempDir("ReadContextListenerTests"); - } - - public void testReadContextListener() throws InterruptedException, IOException { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - - assertTrue(Files.exists(fileLocation)); - assertEquals(NUMBER_OF_PARTS * PART_SIZE, Files.size(fileLocation)); - } - - public void testReadContextListenerFailure() throws Exception { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - InputStream badInputStream = new InputStream() { - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return read(); - } - - @Override - public int read() throws IOException { - throw new IOException(); - } - - @Override - public int available() { - return PART_SIZE; - } - }; - - blobPartStreams.add( - NUMBER_OF_PARTS, - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), - threadPool.generic() - ) - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertFalse(Files.exists(fileLocation)); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - public void testReadContextListenerException() { - Path fileLocation = path.resolve(UUID.randomUUID().toString()); - CountingCompletionListener listener = new CountingCompletionListener(); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - listener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - IOException exception = new IOException(); - readContextListener.onFailure(exception); - assertEquals(1, listener.getFailureCount()); - assertEquals(exception, listener.getException()); - } - - public void testWriteToTempFile() throws Exception { - final String fileName = UUID.randomUUID().toString(); - Path fileLocation = path.resolve(fileName); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ByteArrayInputStream assertingStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)) { - @Override - public int read(byte[] b) throws IOException { - assertTrue("parts written to temp file location", Files.exists(readContextListener.getTmpFileLocation())); - return super.read(b); - } - }; - blobPartStreams.add( - NUMBER_OF_PARTS, - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS), - threadPool.generic() - ) - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertTrue(Files.exists(fileLocation)); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { - final String fileName = UUID.randomUUID().toString(); - Path fileLocation = path.resolve(fileName); - // create an empty file at location. - Files.createFile(fileLocation); - assertEquals(0, Files.readAllBytes(fileLocation).length); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); - readContextListener.onResponse(readContext); - - countDownLatch.await(); - assertTrue(Files.exists(fileLocation)); - assertEquals(50, Files.readAllBytes(fileLocation).length); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - - private List initializeBlobPartStreams() { - List blobPartStreams = new ArrayList<>(); - for (int partNumber = 0; partNumber < NUMBER_OF_PARTS; partNumber++) { - InputStream testStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)); - int finalPartNumber = partNumber; - blobPartStreams.add( - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE), - threadPool.generic() - ) - ); - } - return blobPartStreams; - } -} diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bbd73bcf97aab..97bc822be7d51 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -258,10 +258,11 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + DefaultRecoverySettings.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 941f2f48e71af..5a13f57db2c87 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,7 +33,6 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -156,8 +155,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { remoteMetadataDirectory, mock(RemoteStoreLockManager.class), mock(ThreadPool.class), - shardId, - DefaultRecoverySettings.INSTANCE + shardId ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 78c7fe64cebd9..cad5e47531cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -20,7 +20,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -58,11 +57,7 @@ public void setup() { repositoriesService = mock(RepositoriesService.class); threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceSupplier, - threadPool, - DefaultRecoverySettings.INSTANCE - ); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 0f44d5c3b2f53..ca0a675d00b79 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -25,9 +25,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -42,7 +40,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -51,18 +48,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.UnaryOperator; import org.mockito.Mockito; @@ -71,8 +65,6 @@ import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -150,14 +142,12 @@ public void setup() throws IOException { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } - when(remoteDataDirectory.getDownloadRateLimiter()).thenReturn(UnaryOperator.identity()); when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); } @@ -567,118 +557,6 @@ public void onFailure(Exception e) {} storeDirectory.close(); } - public void testCopyFilesToMultipart() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - Mockito.doAnswer(invocation -> { - ActionListener completionListener = invocation.getArgument(1); - final CompletableFuture future = new CompletableFuture<>(); - future.complete(new InputStreamContainer(new ByteArrayInputStream(new byte[] { 42 }), 0, 1)); - completionListener.onResponse(new ReadContext(1, List.of(() -> future), "")); - return null; - }).when(blobContainer).readBlobAsync(any(), any()); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker(); - long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); - verify(blobContainer, times(1)).readBlobAsync(contains(filename), any()); - verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); - - // Verify stats are updated to DirectoryFileTransferTracker - assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded()); - } - - public void testCopyFilesTo() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToEmptyPath() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToException() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - Mockito.doThrow(new IOException()) - .when(storeDirectory) - .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - - } - - @Override - public void onFailure(Exception e) { - downloadLatch.countDown(); - } - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -688,8 +566,7 @@ public void testCopyFilesFromMultipartIOException() throws Exception { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java new file mode 100644 index 0000000000000..588d9e8bb13a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private Directory source; + private Directory destination; + private Directory secondDestination; + private RemoteStoreFileDownloader fileDownloader; + private Map files = new HashMap<>(); + + @Before + public void setup() throws IOException { + final int streamLimit = randomIntBetween(1, 20); + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put("indices.recovery.max_concurrent_remote_store_streams", streamLimit).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + threadPool = new TestThreadPool(getTestName()); + source = new NIOFSDirectory(createTempDir()); + destination = new NIOFSDirectory(createTempDir()); + secondDestination = new NIOFSDirectory(createTempDir()); + for (int i = 0; i < 10; i++) { + final String filename = "file_" + i; + final int content = randomInt(); + try (IndexOutput output = source.createOutput(filename, IOContext.DEFAULT)) { + output.writeInt(content); + } + files.put(filename, content); + } + fileDownloader = new RemoteStoreFileDownloader( + ShardId.fromString("[RemoteStoreFileDownloaderTests][0]"), + threadPool, + recoverySettings + ); + } + + @After + public void stopThreadPool() throws Exception { + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(5, TimeUnit.SECONDS)); + } + + public void testDownload() throws IOException { + fileDownloader.download(source, destination, files.keySet()); + assertContent(files, destination); + } + + public void testDownloadWithSecondDestination() throws IOException { + fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); + assertContent(files, destination); + assertContent(files, secondDestination); + } + + public void testDownloadWithFileCompletionHandler() throws IOException { + final AtomicInteger counter = new AtomicInteger(0); + fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); + assertContent(files, destination); + assertEquals(files.size(), counter.get()); + } + + public void testDownloadNonExistentFile() { + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + } + + public void testDownloadExtraNonExistentFile() { + List filesWithExtra = new ArrayList<>(files.keySet()); + filesWithExtra.add("not real"); + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + } + + private static void assertContent(Map expected, Directory destination) throws IOException { + // Note that Lucene will randomly write extra files (see org.apache.lucene.tests.mockfile.ExtraFS) + // so we just need to check that all the expected files are present but not that _only_ the expected + // files are present + final Set actualFiles = Set.of(destination.listAll()); + for (String file : expected.keySet()) { + assertTrue(actualFiles.contains(file)); + try (IndexInput input = destination.openInput(file, IOContext.DEFAULT)) { + assertEquals(expected.get(file), Integer.valueOf(input.readInt())); + assertThrows(EOFException.class, input::readByte); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 03f0d27188027..a831a01da620d 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -41,7 +41,6 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; @@ -271,11 +270,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp completionListener.onResponse(null); } - @Override - public void readBlobAsync(String blobName, ActionListener listener) { - throw new RuntimeException("read not supported"); - } - @Override public boolean remoteIntegrityCheckSupported() { return false; diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 80731b378f369..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2067,11 +2067,12 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, null, - new RemoteStoreStatsTrackerFactory(clusterService, settings) + new RemoteStoreStatsTrackerFactory(clusterService, settings), + DefaultRecoverySettings.INSTANCE ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 186c1c7e78f6b..ffa37817e8a03 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -702,7 +702,7 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - null + DefaultRecoverySettings.INSTANCE ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -789,14 +789,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory( - dataDirectory, - metadataDirectory, - remoteStoreLockManager, - threadPool, - shardId, - DefaultRecoverySettings.INSTANCE - ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {