Skip to content

Commit

Permalink
Remove unused code
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 8, 2023
1 parent a37e894 commit e13a999
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,86 +134,28 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
@Override
public CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException {

ExecutorService executorService = Executors.newFixedThreadPool(4);
InputStream inputStream = readBlob(blobName);
long blobSize = inputStream.available();
int numStreams = 5;
inputStream.close();
final int numStreams = forceSingleStream ? 1 : 5;
final ExecutorService executorService = Executors.newFixedThreadPool(numStreams);

final List<InputStream> blobInputStreams = new ArrayList<>();
// Fetch blob metadata
final InputStream blobInputStream = readBlob(blobName);
final long blobSize = blobInputStream.available();
blobInputStream.close();

// Create input streams for the blob
final List<InputStream> blobInputStreams = new ArrayList<>();
long streamSize = (int) Math.ceil(blobSize * 1.0 / numStreams);
for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
long start = streamNumber * streamSize;
blobInputStreams.add(readBlob(blobName, start, streamSize));
}

CompletableFuture<ReadContext> readContextFuture =
CompletableFuture.supplyAsync(() -> new ReadContext(blobInputStreams, null, numStreams, blobSize), executorService);
CompletableFuture<ReadContext> readContextFuture = CompletableFuture.supplyAsync(
() -> new ReadContext(blobInputStreams, null, numStreams, blobSize),
executorService
);
executorService.shutdown();
return readContextFuture;

// int nParts = 10;
// long partSize = readContext.getFileSize() / nParts;
// StreamContext streamContext = readContext.getStreamProvider(partSize);
// Directory directory = readContext.getLocalDirectory();
//
// byte[] buffer = new byte[(int) readContext.getFileSize()];
// AtomicLong totalContentRead = new AtomicLong();
// CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts());
// for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
// int finalPartIdx = partIdx;
// Thread thread = new Thread(() -> {
// try {
// InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
// InputStream inputStream = inputStreamContainer.getInputStream();
// long remainingContentLength = inputStreamContainer.getContentLength();
// long offset = partSize * finalPartIdx;
// while (remainingContentLength > 0) {
// int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength);
// totalContentRead.addAndGet(readContentLength);
// remainingContentLength -= readContentLength;
// offset += readContentLength;
// }
// inputStream.close();
// } catch (IOException e) {
// completionListener.onFailure(e);
// } finally {
// latch.countDown();
// }
// });
// thread.start();
// }
// try {
// if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
// throw new IOException("Timed out waiting for file transfer to complete for " + readContext.getFileName());
// }
// } catch (InterruptedException e) {
// throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + readContext.getFileName());
// }
// logger.error("Buffer: {}", buffer);
// try (IndexOutput output = directory.createOutput(readContext.getFileName(), IOContext.DEFAULT)) {
// output.writeBytes(buffer, buffer.length);
// }
//
// try {
// // bulks need to succeed for segment files to be generated
// if (isSegmentFile(readContext.getFileName()) && triggerDataIntegrityFailure) {
// completionListener.onFailure(
// new RuntimeException(
// new CorruptIndexException(
// "Data integrity check failure for file: " + readContext.getFileName(),
// readContext.getFileName()
// )
// )
// );
// } else {
// readContext.getDownloadFinalizer().accept(true);
// completionListener.onResponse(null);
// }
// } catch (Exception e) {
// completionListener.onFailure(e);
// }
}

private boolean isSegmentFile(String filename) {
Expand Down
65 changes: 0 additions & 65 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2328,71 +2328,6 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
+ "] but got "
+ getRetentionLeases();

/* ActionListener<Void> listener = new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
try {
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
// so before that step, we are deleting the translog files present in remote store.
deleteTranslogFilesFromRemoteTranslog();
}
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference,
// during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assert assertSequenceNumbersInCommit();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
} catch (IOException e) {
onFailure(e);
throw new UncheckedIOException(e);
}
}
@Override
public void onFailure(Exception e) {
}
};
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> latchedListener = new LatchedActionListener<>(listener, latch);
synchronized (engineMutex) {
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false, true, true, latchedListener);
} else {
listener.onResponse(null);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
*/
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
Expand Down
32 changes: 0 additions & 32 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,38 +531,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
store.incRef();
remoteStore.incRef();
try {
// ActionListener listener = new ActionListener() {
// @Override
// public void onResponse(Object o) {
// try {
// if (store.directory().listAll().length == 0) {
// store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
// }
// if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
// indexShard.syncTranslogFilesFromRemoteTranslog();
// } else {
// bootstrap(indexShard, store);
// }
//
// assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
// indexShard.recoveryState().getIndex().setFileDetailsComplete();
// indexShard.openEngineAndRecoverFromTranslog();
// indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
// indexShard.finalizeRecovery();
// indexShard.postRecovery("post recovery from remote_store");
// } catch (IOException e) {
// throw new UncheckedIOException(e);
// } finally {
// store.decRef();
// remoteStore.decRef();
// }
// }
//
// @Override
// public void onFailure(Exception e) {
//
// }
// };
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);

Expand Down

0 comments on commit e13a999

Please sign in to comment.