Skip to content

Commit

Permalink
Remove unwanted code
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 14, 2023
1 parent c0cedd4 commit 602f0fa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 59 deletions.
21 changes: 16 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -4787,9 +4788,10 @@ private String copySegmentFiles(
boolean overrideLocal,
ActionListener<Void> completionListener
) throws IOException {
Set<String> toDownloadSegments = new HashSet<>();
Set<String> downloadedSegments = new HashSet<>();
Set<String> skippedSegments = new HashSet<>();
Set<String> toDownloadSegments = Collections.synchronizedSet(new HashSet<>());
Set<String> downloadedSegments = Collections.synchronizedSet(new HashSet<>());
Set<String> skippedSegments = Collections.synchronizedSet(new HashSet<>());

String segmentNFile = null;

try {
Expand Down Expand Up @@ -4818,6 +4820,7 @@ private String copySegmentFiles(
@Override
public void onResponse(String fileName) {
try {
logger.error("[MultiStream] Completed download for file: {}", fileName);
downloadedSegments.add(fileName);
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT);
Expand All @@ -4842,8 +4845,16 @@ public void onFailure(Exception e) {
completionListener.onResponse(null);
}

toDownloadSegments.forEach(file ->
sourceRemoteDirectory.copyTo(storeDirectory, file, uploadedSegments.get(file).getLength(), IOContext.DEFAULT, filesDownloadListener));
toDownloadSegments.forEach(file -> {
logger.error("[MultiStream] Starting download for file: {}", file);
// try {
// threadPool.generic()
// .submit(() -> sourceRemoteDirectory.copyTo(storeDirectory, file, uploadedSegments.get(file).getLength(), IOContext.DEFAULT, filesDownloadListener)).get();
// } catch (InterruptedException | ExecutionException e) {
// throw new RuntimeException(e);
// }
sourceRemoteDirectory.copyTo(storeDirectory, file, uploadedSegments.get(file).getLength(), IOContext.DEFAULT, filesDownloadListener);
});

} finally {
logger.info("Downloaded segments here: {}", downloadedSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
Expand Down Expand Up @@ -429,31 +431,25 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
*/
public void copyTo(Directory to, String src, long blobLength, IOContext context, ActionListener<String> fileListener) {
downloadBlob(to, src, blobLength, context, fileListener);
// CompletableFuture<Void> future = downloadBlob(to, src, context)
// .exceptionally(throwable -> {
// logger.warn(() -> new ParameterizedMessage("Exception while downloading file {} to the local segment store", src), throwable);
// return null;
// });
// return future;
}

private void downloadBlob(Directory to, String localFileName, long blobSize, IOContext ioContext, ActionListener<String> fileListener) {
final String remoteFileName = getExistingRemoteFilename(localFileName);
assert remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer;
VerifyingMultiStreamBlobContainer blobContainer = (VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();

List<String> partFileNames = new ArrayList<>();
final long optimalStreamSize = 8; // TODO: Replace this with configurable value
final String remoteFileName = getExistingRemoteFilename(localFileName);
final List<String> partFileNames = new ArrayList<>();
final long optimalStreamSize = 8; //blobContainer.readBlobPreferredLength();
final int numStreams = (int) Math.ceil(blobSize * 1.0 / optimalStreamSize);

AtomicInteger atomicInteger = new AtomicInteger(numStreams);

List<CompletableFuture<Void>> futureStreamDownloads = new ArrayList<>();

for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
long start = streamNumber * optimalStreamSize;
long end = Math.min(blobSize, ((streamNumber + 1) * optimalStreamSize));
long length = end - start;

int finalStreamNumber = streamNumber;
String partFileName = localFileName + "__" + Integer.toString(finalStreamNumber);
final String partFileName = localFileName + "__" + streamNumber;
partFileNames.add(partFileName);

ActionListener<InputStream> streamListener = new ActionListener<>() {
Expand All @@ -467,7 +463,10 @@ public void onResponse(InputStream inputStream) {
} catch (IOException ex) {
// TODO: Stream error handling
}

logger.error("[MultiStream] Finished stream {} for file {}", partFileName, localFileName);
atomicInteger.decrementAndGet();

if(atomicInteger.get() == 0) {
postDownload(to, localFileName, ioContext, partFileNames);
fileListener.onResponse(localFileName);
Expand All @@ -480,49 +479,20 @@ public void onFailure(Exception e) {
}
};

try {
((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).readBlobAsync(remoteFileName, start, length, streamListener);
} catch (IOException e) {
throw new RuntimeException(e);
}
}


// TODO: Change threadpool to something appropriate
logger.error("[MultiStream] Starting stream {} for file {}", streamNumber, localFileName);
CompletableFuture<Void> futureStreamDownload = CompletableFuture.runAsync(() -> {
try {
blobContainer.readBlobAsync(remoteFileName, start, length, streamListener);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, threadPool.generic());
futureStreamDownloads.add(futureStreamDownload);
}

// Code for future
// PlainActionFuture<ReadContext> completionListener = new PlainActionFuture<>();
// try {
// ((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).asyncBlobDownload(remoteFileName, false, completionListener);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
//
// CompletableFuture<ReadContext> readContextFuture = CompletableFuture.supplyAsync(completionListener::actionGet);
//
//
// CompletableFuture<Void> completableFuture = readContextFuture.thenCompose(readContext -> CompletableFuture.runAsync(() -> {
// List<InputStream> blobInputStreams = readContext.getBlobInputStreams();
// List<String> partFileNames = new ArrayList<>();
// for (int streamNumber = 0; streamNumber < blobInputStreams.size(); streamNumber++) {
//
//
// try (InputStream inputStream = blobInputStreams.get(streamNumber);
// IndexOutput indexOutput = to.createTempOutput(localFileName, Integer.toString(streamNumber), ioContext)){
//
// byte[] buffer = new byte[inputStream.available()];
// while ((inputStream.read(buffer)) != -1) {
// indexOutput.writeBytes(buffer, 0, buffer.length);
// }
//
// partFileNames.add(indexOutput.getName());
// } catch (IOException e) {
// // TODO: Error handling
// }
// }
// postDownload(to, localFileName, ioContext, partFileNames);
// }, threadPool.generic()));
//
// return completableFuture;
CompletableFuture.allOf(futureStreamDownloads.toArray(new CompletableFuture[0])).join();
}

private void postDownload(Directory segmentDirectory, String fileName, IOContext ioContext, List<String> partFileNames) {
Expand Down

0 comments on commit 602f0fa

Please sign in to comment.