diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java index 3025265559034..3b7beb4712a79 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java @@ -134,86 +134,28 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp @Override public CompletableFuture asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException { - ExecutorService executorService = Executors.newFixedThreadPool(4); - InputStream inputStream = readBlob(blobName); - long blobSize = inputStream.available(); - int numStreams = 5; - inputStream.close(); + final int numStreams = forceSingleStream ? 1 : 5; + final ExecutorService executorService = Executors.newFixedThreadPool(numStreams); - final List blobInputStreams = new ArrayList<>(); + // Fetch blob metadata + final InputStream blobInputStream = readBlob(blobName); + final long blobSize = blobInputStream.available(); + blobInputStream.close(); + // Create input streams for the blob + final List blobInputStreams = new ArrayList<>(); long streamSize = (int) Math.ceil(blobSize * 1.0 / numStreams); for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) { long start = streamNumber * streamSize; blobInputStreams.add(readBlob(blobName, start, streamSize)); } - CompletableFuture readContextFuture = - CompletableFuture.supplyAsync(() -> new ReadContext(blobInputStreams, null, numStreams, blobSize), executorService); + CompletableFuture readContextFuture = CompletableFuture.supplyAsync( + () -> new ReadContext(blobInputStreams, null, numStreams, blobSize), + executorService + ); executorService.shutdown(); return readContextFuture; - -// int nParts = 10; -// long partSize = readContext.getFileSize() / nParts; -// StreamContext streamContext = readContext.getStreamProvider(partSize); -// Directory directory = readContext.getLocalDirectory(); -// -// byte[] buffer = new byte[(int) readContext.getFileSize()]; -// AtomicLong totalContentRead = new AtomicLong(); -// CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); -// for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { -// int finalPartIdx = partIdx; -// Thread thread = new Thread(() -> { -// try { -// InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); -// InputStream inputStream = inputStreamContainer.getInputStream(); -// long remainingContentLength = inputStreamContainer.getContentLength(); -// long offset = partSize * finalPartIdx; -// while (remainingContentLength > 0) { -// int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); -// totalContentRead.addAndGet(readContentLength); -// remainingContentLength -= readContentLength; -// offset += readContentLength; -// } -// inputStream.close(); -// } catch (IOException e) { -// completionListener.onFailure(e); -// } finally { -// latch.countDown(); -// } -// }); -// thread.start(); -// } -// try { -// if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { -// throw new IOException("Timed out waiting for file transfer to complete for " + readContext.getFileName()); -// } -// } catch (InterruptedException e) { -// throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + readContext.getFileName()); -// } -// logger.error("Buffer: {}", buffer); -// try (IndexOutput output = directory.createOutput(readContext.getFileName(), IOContext.DEFAULT)) { -// output.writeBytes(buffer, buffer.length); -// } -// -// try { -// // bulks need to succeed for segment files to be generated -// if (isSegmentFile(readContext.getFileName()) && triggerDataIntegrityFailure) { -// completionListener.onFailure( -// new RuntimeException( -// new CorruptIndexException( -// "Data integrity check failure for file: " + readContext.getFileName(), -// readContext.getFileName() -// ) -// ) -// ); -// } else { -// readContext.getDownloadFinalizer().accept(true); -// completionListener.onResponse(null); -// } -// } catch (Exception e) { -// completionListener.onFailure(e); -// } } private boolean isSegmentFile(String filename) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ee63f20933581..510ab72fe7840 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2328,71 +2328,6 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b + "] but got " + getRetentionLeases(); - /* ActionListener listener = new ActionListener() { - @Override - public void onResponse(Void unused) { - try { - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; - verifyNotClosed(); - - if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - if (syncFromRemote) { - syncRemoteTranslogAndUpdateGlobalCheckpoint(); - } else { - // we will enter this block when we do not want to recover from remote translog. - // currently only during snapshot restore, we are coming into this block. - // here, as while initiliazing remote translog we cannot skip downloading translog files, - // so before that step, we are deleting the translog files present in remote store. - deleteTranslogFilesFromRemoteTranslog(); - - } - } - // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). - final Engine newEngine = engineFactory.newReadWriteEngine(config); - onNewEngine(newEngine); - currentEngineReference.set(newEngine); - // We set active because we are now writing operations to the engine; this way, - // we can flush if we go idle after some time and become inactive. - active.set(true); - - } - - // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, - // during - // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. - onSettingsChanged(); - assert assertSequenceNumbersInCommit(); - recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); - - } catch (IOException e) { - onFailure(e); - throw new UncheckedIOException(e); - } - } - - @Override - public void onFailure(Exception e) { - - } - }; - - CountDownLatch latch = new CountDownLatch(1); - ActionListener latchedListener = new LatchedActionListener<>(listener, latch); - - synchronized (engineMutex) { - if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true, true, latchedListener); - } else { - listener.onResponse(null); - } - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } -*/ synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index c3d60bad83d0e..994e3c8950270 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -531,38 +531,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco store.incRef(); remoteStore.incRef(); try { -// ActionListener listener = new ActionListener() { -// @Override -// public void onResponse(Object o) { -// try { -// if (store.directory().listAll().length == 0) { -// store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); -// } -// if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) { -// indexShard.syncTranslogFilesFromRemoteTranslog(); -// } else { -// bootstrap(indexShard, store); -// } -// -// assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; -// indexShard.recoveryState().getIndex().setFileDetailsComplete(); -// indexShard.openEngineAndRecoverFromTranslog(); -// indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); -// indexShard.finalizeRecovery(); -// indexShard.postRecovery("post recovery from remote_store"); -// } catch (IOException e) { -// throw new UncheckedIOException(e); -// } finally { -// store.decRef(); -// remoteStore.decRef(); -// } -// } -// -// @Override -// public void onFailure(Exception e) { -// -// } -// }; // Download segments from remote segment store indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);