diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index e72ad501d43db..88f4e974beae1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -71,10 +71,8 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -376,16 +374,6 @@ void restoreFiles() throws IOException { restore(snapshotFiles); } - private static class FileSession { - FileSession(long lastTrackedSeqNo, long lastOffset) { - this.lastTrackedSeqNo = lastTrackedSeqNo; - this.lastOffset = lastOffset; - } - - final long lastTrackedSeqNo; - final long lastOffset; - } - @Override protected void restoreFiles(List filesToRecover, Store store) throws IOException { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); @@ -395,56 +383,27 @@ protected void restoreFiles(List filesToRecover, Store store) throws I final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); final AtomicReference> error = new AtomicReference<>(); - final ArrayDeque remainingFiles = new ArrayDeque<>(filesToRecover); - final Map inFlightRequests = new HashMap<>(); - final Object mutex = new Object(); - - while (true) { - if (error.get() != null) { - break; - } - final FileInfo fileToRecover; - final FileSession prevFileSession; - synchronized (mutex) { - if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) { - break; - } - final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks(); - if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) { - for (int i = 0; i < maxConcurrentFileChunks; i++) { - if (remainingFiles.isEmpty()) { - break; - } - inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0)); - } - } - final Map.Entry minEntry = - inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get(); - prevFileSession = minEntry.getValue(); - fileToRecover = minEntry.getKey(); - } - try { - requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo); - final FileSession fileSession; - synchronized (mutex) { - fileSession = inFlightRequests.get(fileToRecover); - // if file has been removed in the mean-while, it means that restore of this file completed, so start working - // on the next one - if (fileSession == null) { - continue; - } - } + for (FileInfo fileInfo : filesToRecover) { + final long fileLength = fileInfo.length(); + long offset = 0; + while (offset < fileLength && error.get() == null) { final long requestSeqId = requestSeqIdTracker.generateSeqNo(); try { - synchronized (mutex) { - inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset)); + requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks()); + + if (error.get() != null) { + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + break; } - final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), - fileToRecover.length() - fileSession.lastOffset)); + + final int bytesRequested = Math.toIntExact( + Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset)); + offset += bytesRequested; + final GetCcrRestoreFileChunkRequest request = - new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested); + new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested); logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, - fileToRecover.name(), fileSession.lastOffset, bytesRequested); + fileInfo.name(), offset, bytesRequested); TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); ActionListener listener = @@ -452,7 +411,7 @@ protected void restoreFiles(List filesToRecover, Store store) throws I r -> threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } @@ -460,53 +419,27 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { final int actualChunkSize = r.getChunk().length(); logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, - snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize); + snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize); final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); throttleListener.accept(nanosPaused); - final long newOffset = r.getOffset() + actualChunkSize; - - assert r.getOffset() == fileSession.lastOffset; - assert actualChunkSize == bytesRequested; - assert newOffset <= fileToRecover.length(); - final boolean lastChunk = newOffset >= fileToRecover.length(); - multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(), - lastChunk); - if (lastChunk) { - synchronized (mutex) { - final FileSession removed = inFlightRequests.remove(fileToRecover); - assert removed != null : "session disappeared for " + fileToRecover.name(); - assert removed.lastTrackedSeqNo == requestSeqId; - assert removed.lastOffset == fileSession.lastOffset; - } - } else { - synchronized (mutex) { - final FileSession replaced = inFlightRequests.replace(fileToRecover, - new FileSession(requestSeqId, newOffset)); - assert replaced != null : "session disappeared for " + fileToRecover.name(); - assert replaced.lastTrackedSeqNo == requestSeqId; - assert replaced.lastOffset == fileSession.lastOffset; - } - } + final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength; + multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } }), e -> { - error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME); remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); - throw e; } - } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); - break; } - } + try { requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); } catch (InterruptedException e) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 65fb14fdb95ec..1c3ab60dd27cb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -208,11 +208,7 @@ private Store.MetadataSnapshot getMetaData() throws IOException { } private long readFileBytes(String fileName, BytesReference reference) throws IOException { - Releasable lock = keyedLock.tryAcquire(fileName); - if (lock == null) { - throw new IllegalStateException("can't read from the same file on the same session concurrently"); - } - try (Releasable releasable = lock) { + try (Releasable ignored = keyedLock.acquire(fileName)) { final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> { try { return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);