Skip to content

Commit

Permalink
Add file channel support for writes instead of merges
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 23, 2023
1 parent 7ae816a commit 6320967
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -212,7 +213,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ public void testAsyncBlobDownload() {

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, 0, 0, new PlainActionFuture<>());
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
});

assertEquals("S3 BlobContainer currently does not support async blob downloads", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
package org.opensearch.remotestore.multipart.mocks;

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -115,16 +117,36 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
listener.onResponse(readBlob(blobName, position, length));
long blobSize = listBlobs().get(blobName).length();
long partSize = blobSize / 10;
ReadContext readContext = supplyReadContext(partSize, blobSize, blobName);
listener.onResponse(readContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

ReadContext supplyReadContext(long partSize, long contentLength, String blobName) {
long lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
return new ReadContext(getTransferPartStreamSupplier(blobName), partSize, lastPartSize, numberOfParts, null);
}

private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> getTransferPartStreamSupplier(String blobName) {
return ((partNo, size, position) -> {
try {
InputStream inputStream = readBlob(blobName, position, size);
return new InputStreamContainer(inputStream, size, position);
} catch (IOException e) {
throw e;
}
});
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,15 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.listener.FileCompletionListener;
import org.opensearch.common.blobstore.stream.listener.StreamCompletionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -42,70 +38,25 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of an {@link java.io.InputStream} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the input stream fails.
* Creates an async callback of an {@link ReadContext} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the blob fetch fails.
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param position The position in the blob where the next byte will be read.
* @param length An indication of the number of bytes to be read.
* @param listener Async listener for {@link InputStream} object which serves the input streams and other metadata for the blob
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
*/
void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener);

/**
* Fetches the checksum for the blob stored within the repository implementation
* @param blobName The name of the blob for which repository checksum is needed
* @return checksum string of the blob
* @throws IOException in case of any issues with the blob
*/
default String blobChecksum(String blobName) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Downloads the blob to the given path location using an async mechanism
* @param blobName The name of the blob which is to be fetched from the container
* @param segmentFileLocation The location where the blob needs to be downloade
* @param segmentCompletionListener Async listener for notification of completion or failure
*/
default void asyncBlobDownload(String blobName, Path segmentFileLocation, ActionListener<String> segmentCompletionListener) {
try {
final long segmentSize = listBlobs().get(blobName).length();
final long optimalStreamSize = readBlobPreferredLength();
final int numStreams = (int) Math.ceil(segmentSize * 1.0 / optimalStreamSize);

final AtomicBoolean anyStreamFailed = new AtomicBoolean();
final List<String> partFileNames = Collections.synchronizedList(new ArrayList<>());

final String segmentFileName = segmentFileLocation.getFileName().toString();
final Path segmentDirectory = segmentFileLocation.getParent();

final FileCompletionListener fileCompletionListener = new FileCompletionListener(
numStreams,
segmentFileName,
segmentDirectory,
partFileNames,
anyStreamFailed,
segmentCompletionListener
);

for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
String partFileName = UUID.randomUUID().toString();
long start = streamNumber * optimalStreamSize;
long end = Math.min(segmentSize, ((streamNumber + 1) * optimalStreamSize));
long length = end - start;
partFileNames.add(partFileName);

final StreamCompletionListener streamCompletionListener = new StreamCompletionListener(
partFileName,
segmentDirectory,
anyStreamFailed,
fileCompletionListener
);
readBlobAsync(blobName, start, length, streamCompletionListener);
}
} catch (Exception e) {
segmentCompletionListener.onFailure(e);
}

void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

default void asyncBlobDownload(
String blobName,
Path segmentFileLocation,
ThreadPool threadPool,
ActionListener<String> segmentCompletionListener
) {
ReadContextListener readContextListener = new ReadContextListener(
blobName,
segmentFileLocation,
threadPool,
segmentCompletionListener
);
readBlobAsync(blobName, readContextListener);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.io.InputStreamContainer;

import java.io.IOException;

/**
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readStreams</code>
*
* @opensearch.internal
*/
public class ReadContext extends StreamContext {
private final String blobChecksum;

public ReadContext(
CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> streamSupplier,
long partSize,
long lastPartSize,
int numberOfParts,
String blobChecksum
) {
super(streamSupplier, partSize, lastPartSize, numberOfParts);
this.blobChecksum = blobChecksum;
}

public String getBlobChecksum() {
return blobChecksum;
}
}
Loading

0 comments on commit 6320967

Please sign in to comment.