Skip to content

Commit

Permalink
Add file channel support for writes instead of merges
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 22, 2023
1 parent 7ae816a commit c2da08d
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -212,7 +213,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ public void testAsyncBlobDownload() {

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, 0, 0, new PlainActionFuture<>());
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
});

assertEquals("S3 BlobContainer currently does not support async blob downloads", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
package org.opensearch.remotestore.multipart.mocks;

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -115,16 +117,36 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
listener.onResponse(readBlob(blobName, position, length));
long blobSize = listBlobs().get(blobName).length();
long partSize = blobSize / 10;
ReadContext readContext = supplyReadContext(partSize, blobSize, blobName);
listener.onResponse(readContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

ReadContext supplyReadContext(long partSize, long contentLength, String blobName) {
long lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
return new ReadContext(getTransferPartStreamSupplier(blobName), partSize, lastPartSize, numberOfParts, null);
}

private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> getTransferPartStreamSupplier(String blobName) {
return ((partNo, size, position) -> {
try {
InputStream inputStream = readBlob(blobName, position, size);
return new InputStreamContainer(inputStream, size, position);
} catch (IOException e) {
throw e;
}
});
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.listener.FileCompletionListener;
import org.opensearch.common.blobstore.stream.listener.StreamCompletionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.nio.file.StandardOpenOption;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -42,70 +40,48 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of an {@link java.io.InputStream} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the input stream fails.
* Creates an async callback of an {@link ReadContext} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the blob fetch fails.
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param position The position in the blob where the next byte will be read.
* @param length An indication of the number of bytes to be read.
* @param listener Async listener for {@link InputStream} object which serves the input streams and other metadata for the blob
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
*/
void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener);
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

/**
* Fetches the checksum for the blob stored within the repository implementation
* @param blobName The name of the blob for which repository checksum is needed
* @return checksum string of the blob
* @throws IOException in case of any issues with the blob
*/
default String blobChecksum(String blobName) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Downloads the blob to the given path location using an async mechanism
* @param blobName The name of the blob which is to be fetched from the container
* @param segmentFileLocation The location where the blob needs to be downloade
* @param segmentCompletionListener Async listener for notification of completion or failure
*/
default void asyncBlobDownload(String blobName, Path segmentFileLocation, ActionListener<String> segmentCompletionListener) {
try {
final long segmentSize = listBlobs().get(blobName).length();
final long optimalStreamSize = readBlobPreferredLength();
final int numStreams = (int) Math.ceil(segmentSize * 1.0 / optimalStreamSize);

final AtomicBoolean anyStreamFailed = new AtomicBoolean();
final List<String> partFileNames = Collections.synchronizedList(new ArrayList<>());

final String segmentFileName = segmentFileLocation.getFileName().toString();
final Path segmentDirectory = segmentFileLocation.getParent();
ActionListener<ReadContext> readBlobListener = new ActionListener<>() {
@Override
public void onResponse(ReadContext readContext) {
int numParts = readContext.getNumberOfParts();
for (int partNumber = 0; partNumber < numParts; partNumber++) {
try (
FileChannel fileChannel = FileChannel.open(
segmentFileLocation,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE
)
) {
InputStreamContainer inputStreamContainer = readContext.provideStream(partNumber);
long offset = inputStreamContainer.getOffset();
long partSize = inputStreamContainer.getContentLength();
try (InputStream inputStream = inputStreamContainer.getInputStream()) {
fileChannel.transferFrom(Channels.newChannel(inputStream), offset, partSize);
}
} catch (IOException e) {
segmentCompletionListener.onFailure(e);
}
}
}

final FileCompletionListener fileCompletionListener = new FileCompletionListener(
numStreams,
segmentFileName,
segmentDirectory,
partFileNames,
anyStreamFailed,
segmentCompletionListener
);
@Override
public void onFailure(Exception e) {
segmentCompletionListener.onFailure(e);
}
};

for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
String partFileName = UUID.randomUUID().toString();
long start = streamNumber * optimalStreamSize;
long end = Math.min(segmentSize, ((streamNumber + 1) * optimalStreamSize));
long length = end - start;
partFileNames.add(partFileName);

final StreamCompletionListener streamCompletionListener = new StreamCompletionListener(
partFileName,
segmentDirectory,
anyStreamFailed,
fileCompletionListener
);
readBlobAsync(blobName, start, length, streamCompletionListener);
}
readBlobAsync(blobName, readBlobListener);
} catch (Exception e) {
segmentCompletionListener.onFailure(e);
}

}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit c2da08d

Please sign in to comment.