Skip to content

Commit

Permalink
Initial commit for POC
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Jul 26, 2023
1 parent 57d5e90 commit f01530b
Show file tree
Hide file tree
Showing 14 changed files with 659 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush)
}
maxSeqNoRefreshedOrFlushed = maxSeqNo;
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
int numberOfOperations = randomIntBetween(1, 5);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc();
maxSeqNo = response.getSeqNo();
Expand All @@ -112,7 +112,7 @@ private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
protected void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
Expand All @@ -129,12 +129,10 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);
// assertEquals(indexStats.get(TOTAL_OPERATIONS).longValue(),
// client().prepareSearch(INDEX_NAME).setSize(0).get().getInternalResponse().hits().getTotalHits().value);
verifyRestoredData(indexStats, remoteTranslog);

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
}

public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected void putRepository(Path path) {
logger.error("Repo Path: {}", path);
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}

@Override
public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 2, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@

package org.opensearch.remotestore.multipart.mocks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -34,14 +40,15 @@ public class MockFsVerifyingBlobContainer extends FsBlobContainer implements Ver

private final boolean triggerDataIntegrityFailure;

private static final Logger logger = LogManager.getLogger(MockFsVerifyingBlobContainer.class);

public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) {
super(blobStore, blobPath, path);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {

int nParts = 10;
long partSize = writeContext.getFileSize() / nParts;
StreamContext streamContext = writeContext.getStreamProvider(partSize);
Expand Down Expand Up @@ -114,6 +121,81 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void asyncBlobDownload(ReadContext readContext, ActionListener<Void> completionListener) throws IOException {
int nParts = 10;
long partSize = readContext.getFileSize() / nParts;
StreamContext streamContext = readContext.getStreamProvider(partSize);
Directory directory = readContext.getLocalDirectory();

byte[] buffer = new byte[(int) readContext.getFileSize()];
AtomicLong totalContentRead = new AtomicLong();
CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts());
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
int finalPartIdx = partIdx;
Thread thread = new Thread(() -> {
try {
InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
InputStream inputStream = inputStreamContainer.getInputStream();
long remainingContentLength = inputStreamContainer.getContentLength();
long offset = partSize * finalPartIdx;
while (remainingContentLength > 0) {
int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength);
totalContentRead.addAndGet(readContentLength);
remainingContentLength -= readContentLength;
offset += readContentLength;
}
inputStream.close();
} catch (IOException e) {
completionListener.onFailure(e);
} finally {
latch.countDown();
}
});
thread.start();
}
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + readContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + readContext.getFileName());
}
logger.error("Buffer: {}", buffer);
try (IndexOutput output = directory.createOutput(readContext.getFileName(), IOContext.DEFAULT)) {
output.writeBytes(buffer, buffer.length);
}
if (readContext.getFileSize() != totalContentRead.get()) {
throw new IOException(
"Incorrect content length read for file "
+ readContext.getFileName()
+ ", actual file size: "
+ readContext.getFileSize()
+ ", bytes read: "
+ totalContentRead.get()
);
}

try {
// bulks need to succeed for segment files to be generated
if (isSegmentFile(readContext.getFileName()) && triggerDataIntegrityFailure) {
completionListener.onFailure(
new RuntimeException(
new CorruptIndexException(
"Data integrity check failure for file: " + readContext.getFileName(),
readContext.getFileName()
)
)
);
} else {
readContext.getDownloadFinalizer().accept(true);
completionListener.onResponse(null);
}
} catch (Exception e) {
completionListener.onFailure(e);
}
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.blobstore;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
Expand All @@ -31,4 +32,15 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Reads blob content from multiple streams, each from a specific part of the file, which is provided by the
* StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading
* any of the input streams fails, or writing to the target blob fails
*
* @param writeContext A WriteContext object encapsulating all information needed to perform the upload
* @param completionListener Listener on which upload events should be published.
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobDownload(ReadContext writeContext, ActionListener<Void> completionListener) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.apache.lucene.store.Directory;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;

import java.io.IOException;

/**
* WriteContext is used to encapsulate all data needed by <code>BlobContainer#readStreams</code>
*
* @opensearch.internal
*/
public class ReadContext {

private final String fileName;
private final String remoteFileName;
private final StreamContextSupplier streamContextSupplier;
private final long fileSize;
private final boolean failIfAlreadyExists;
private final CheckedConsumer<Boolean, IOException> downloadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private Directory localDirectory;

/**
* Construct a new WriteContext object
*
* @param fileName The name of the file being downloaded
* @param streamContextSupplier A supplier that will provide StreamContext to the plugin
* @param fileSize The total size of the file being downloaded
* @param failIfAlreadyExists A boolean to fail the download if the file exists
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum This parameter expected only when the vendor plugin is expected to do server side data integrity verification
*/
public ReadContext(
String remoteFileName,
String fileName,
StreamContextSupplier streamContextSupplier,
long fileSize,
boolean failIfAlreadyExists,
CheckedConsumer<Boolean, IOException> downloadFinalizer,
boolean doRemoteDataIntegrityCheck,
@Nullable Long expectedChecksum
) {
this.remoteFileName = remoteFileName;
this.fileName = fileName;
this.streamContextSupplier = streamContextSupplier;
this.fileSize = fileSize;
this.failIfAlreadyExists = failIfAlreadyExists;
this.downloadFinalizer = downloadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
}

public void setLocalDirectory(Directory directory) {
this.localDirectory = directory;
}

public Directory getLocalDirectory() {
return this.localDirectory;
}

/**
* @return The file name
*/
public String getFileName() {
return fileName;
}

public String getRemoteFileName() {
return remoteFileName;
}

/**
* @return The boolean representing whether to fail the file upload if it exists
*/
public boolean isFailIfAlreadyExists() {
return failIfAlreadyExists;
}

/**
* @param partSize The size of a single part to be uploaded
* @return The stream context which will be used by the plugin to initialize streams from the file
*/
public StreamContext getStreamProvider(long partSize) {
return streamContextSupplier.supplyStreamContext(partSize);
}

/**
* @return The total size of the file
*/
public long getFileSize() {
return fileSize;
}

/**
* @return The <code>UploadFinalizer</code> for this upload
*/
public CheckedConsumer<Boolean, IOException> getDownloadFinalizer() {
return downloadFinalizer;
}

/**
* @return A boolean for whether remote data integrity check has to be done for this upload or not
*/
public boolean doRemoteDataIntegrityCheck() {
return doRemoteDataIntegrityCheck;
}

/**
* @return The CRC32 checksum associated with this file
*/
public Long getExpectedChecksum() {
return expectedChecksum;
}
}
Loading

0 comments on commit f01530b

Please sign in to comment.