diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java index 09366a38a9957..fedf601961476 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java @@ -57,7 +57,7 @@ * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue * until all chunk requests are sent/responded. */ -abstract class MultiFileTransfer implements Closeable { +public abstract class MultiFileTransfer implements Closeable { private Status status = Status.PROCESSING; private final Logger logger; private final ActionListener listener; @@ -121,7 +121,7 @@ private void handleItems(List>> return; } final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - sendChunkRequest(request.v2(), ActionListener.wrap( + executeChunkRequest(request.v2(), ActionListener.wrap( r -> addItem(requestSeqId, request.v1(), null), e -> addItem(requestSeqId, request.v1(), e))); } @@ -179,7 +179,7 @@ private Tuple getNextRequest() throws Exception { protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException; - protected abstract void sendChunkRequest(Request request, ActionListener listener); + protected abstract void executeChunkRequest(Request request, ActionListener listener); protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception; @@ -195,7 +195,7 @@ private static class FileChunkResponseItem { } } - protected interface ChunkRequest { + public interface ChunkRequest { /** * @return {@code true} if this chunk request is the last chunk of the current file */ diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java index 87a6d18671a6f..b6a5d2c908842 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java @@ -27,9 +27,11 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.transport.Transports; import java.io.IOException; import java.util.Arrays; @@ -39,10 +41,12 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; -public class MultiFileWriter implements Releasable { +public class MultiFileWriter extends AbstractRefCounted implements Releasable { public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { + super("multi_file_writer"); this.store = store; this.indexState = indexState; this.tempFilePrefix = tempFilePrefix; @@ -51,6 +55,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF } private final Runnable ensureOpen; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Logger logger; private final Store store; private final RecoveryState.Index indexState; @@ -64,6 +69,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk) throws IOException { + assert Transports.assertNotTransportThread("multi_file_writer"); final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); } @@ -138,6 +144,13 @@ private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position, @Override public void close() { + if (closed.compareAndSet(false, true)) { + decRef(); + } + } + + @Override + protected void closeInternal() { fileChunkWriters.clear(); // clean open index outputs Iterator> iterator = openIndexOutputs.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c14fbca308910..1d45d048c9ba4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -890,7 +890,7 @@ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException { } @Override - protected void sendChunkRequest(FileChunk request, ActionListener listener) { + protected void executeChunkRequest(FileChunk request, ActionListener listener) { cancellableThreads.checkForCancel(); recoveryTarget.writeFileChunk( request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 8ab3b12b7a756..12c8e72a0cb02 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -21,6 +20,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,18 +31,16 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; @@ -54,6 +52,7 @@ import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.MultiFileTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; @@ -87,12 +86,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; -import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease; @@ -473,97 +471,82 @@ void restoreFiles(Store store) { } @Override - protected void restoreFiles(List filesToRecover, Store store) throws IOException { + protected void restoreFiles(List filesToRecover, Store store) { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); + final PlainActionFuture restoreFilesFuture = new PlainActionFuture<>(); + final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); + final MultiFileTransfer multiFileTransfer = new MultiFileTransfer<>( + logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) { - try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { - })) { - final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - final AtomicReference> error = new AtomicReference<>(); + final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {}); + long offset = 0; - for (FileInfo fileInfo : filesToRecover) { - final long fileLength = fileInfo.length(); - long offset = 0; - while (offset < fileLength && error.get() == null) { - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - try { - requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks()); - - if (error.get() != null) { - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - break; - } - - final int bytesRequested = Math.toIntExact( - Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset)); - offset += bytesRequested; - - final GetCcrRestoreFileChunkRequest request = - new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested); - logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, - fileInfo.name(), offset, bytesRequested); - - TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); - ActionListener listener = - ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( - r -> threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - - @Override - protected void doRun() throws Exception { - final int actualChunkSize = r.getChunk().length(); - logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, - snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize); - final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); - throttleListener.accept(nanosPaused); - final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength; - multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - }), - e -> { - error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); - } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - } + @Override + protected void onNewFile(StoreFileMetaData md) { + offset = 0; } - try { - requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ElasticsearchException(e); + @Override + protected FileChunk nextChunkRequest(StoreFileMetaData md) { + final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset)); + offset += bytesRequested; + return new FileChunk(md, bytesRequested, offset == md.length()); } - if (error.get() != null) { - handleError(store, error.get().v2()); + + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener) { + final ActionListener threadedListener + = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap( + r -> { + writeFileChunk(request.md, r); + listener.onResponse(null); + }, listener::onFailure), false); + + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), + ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(), + ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME)); } - } - logger.trace("[{}] completed CCR restore", shardId); - } + private void writeFileChunk(StoreFileMetaData md, + GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception { + final int actualChunkSize = r.getChunk().length(); + logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", + shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize); + final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); + throttleListener.accept(nanosPaused); + multiFileWriter.incRef(); + try (Releasable ignored = multiFileWriter::decRef) { + final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length(); + multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk); + } catch (Exception e) { + handleError(md, e); + throw e; + } + } + + @Override + protected void handleError(StoreFileMetaData md, Exception e) throws Exception { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + try { + store.markStoreCorrupted(corruptIndexException); + } catch (IOException ioe) { + logger.warn("store cannot be marked as corrupted", e); + } + throw corruptIndexException; + } + throw e; + } - private void handleError(Store store, Exception e) throws IOException { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - try { - store.markStoreCorrupted(corruptIndexException); - } catch (IOException ioe) { - logger.warn("store cannot be marked as corrupted", e); + @Override + public void close() { + multiFileWriter.close(); } - throw corruptIndexException; - } else { - ExceptionsHelper.reThrowIfNotNull(e); - } + }; + multiFileTransfer.start(); + restoreFilesFuture.actionGet(); + logger.trace("[{}] completed CCR restore", shardId); } @Override @@ -572,5 +555,22 @@ public void close() { ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } + + private static class FileChunk implements MultiFileTransfer.ChunkRequest { + final StoreFileMetaData md; + final int bytesRequested; + final boolean lastChunk; + + FileChunk(StoreFileMetaData md, int bytesRequested, boolean lastChunk) { + this.md = md; + this.bytesRequested = bytesRequested; + this.lastChunk = lastChunk; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + } } }