Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce recovery time with compress or secure transport #36981

Merged
merged 19 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ The following _expert_ setting can be set to manage the recovery policy.
`indices.recovery.max_bytes_per_sec`::
Defaults to `40mb`.

`indices.recovery.max_concurrent_file_chunks`::
Defaults to `2`.

This setting can be dynamically updated on a live cluster with the
<<cluster-update-settings,cluster-update-settings>> API:
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt());
handler = new RecoverySourceHandler(shard, recoveryTarget, request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
return handler;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
}

recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
request.lastChunk(), request.totalTranslogOps()
);
request.lastChunk(), request.totalTranslogOps()).get();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class RecoverySettings {
Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Property.Dynamic, Property.NodeScope);

/**
* Controls the maximum number of file chunk requests can be sent concurrently from the source node to the target node.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 8, Property.Dynamic, Property.NodeScope);

/**
* how long to wait before retrying after issues cause by cluster state syncing between nodes
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
Expand Down Expand Up @@ -78,6 +84,7 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
Expand All @@ -89,6 +96,7 @@ public class RecoverySettings {

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
Expand All @@ -108,6 +116,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
Expand Down Expand Up @@ -180,4 +189,12 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
}

public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}

private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -94,6 +95,7 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentFileChunks;

protected final RecoveryResponse response;

Expand All @@ -113,16 +115,17 @@ protected void onCancel(String reason, @Nullable Exception suppressedException)
}
};

public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
final int fileChunkSizeInBytes) {
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
this.request = request;
this.shardId = this.request.shardId().id();
this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
this.chunkSizeInBytes = fileChunkSizeInBytes;
this.response = new RecoveryResponse();
// if the target is on an old version, it won't be able to handle out-of-order file chunks.
this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_7_0_0) ? maxConcurrentFileChunks : 1;
}

public StartRecoveryRequest getRequest() {
Expand Down Expand Up @@ -639,8 +642,11 @@ final class RecoveryOutputStream extends OutputStream {
private final StoreFileMetaData md;
private final Supplier<Integer> translogOps;
private long position = 0;
private final Semaphore sendingTickets;
private volatile Exception error;

RecoveryOutputStream(StoreFileMetaData md, Supplier<Integer> translogOps) {
this.sendingTickets = new Semaphore(maxConcurrentFileChunks);
this.md = md;
this.translogOps = translogOps;
}
Expand All @@ -658,14 +664,54 @@ public void write(byte[] b, int offset, int length) throws IOException {
}

private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
// Actually send the file chunk to the target node, waiting for it to complete
cancellableThreads.executeIO(() ->
// Actually send the file chunk to the target node.
cancellableThreads.executeIO(() -> {
sendingTickets.acquire();
try {
throwAndClearErrorIfExist();
recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogOps.get())
);
.whenComplete((res, ex) -> {
if (ex != null) {
synchronized (this) {
if (error == null) {
error = (Exception) ex;
} else {
error.addSuppressed(ex);
}
}
}
sendingTickets.release();
});
} catch (Exception e) {
sendingTickets.release();
throw e;
}
});
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
}
}

@Override
public void close() {
// wait for the completion of all ongoing chunks then check for the existing error.
cancellableThreads.execute(() -> sendingTickets.acquire(maxConcurrentFileChunks));
try {
throwAndClearErrorIfExist();
} finally {
sendingTickets.release(maxConcurrentFileChunks);
}
}

private void throwAndClearErrorIfExist() {
final Exception existingError = this.error;
if (existingError != null) {
synchronized (this) {
this.error = null;
}
throw ExceptionsHelper.convertToRuntime(existingError);
}
}
}

void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -89,6 +92,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final AtomicBoolean finished = new AtomicBoolean();

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<String, FileChunkWriter> fileChunkWriters = ConcurrentCollections.newConcurrentMap();
private final CancellableThreads cancellableThreads;

// last time this status was accessed
Expand Down Expand Up @@ -487,19 +491,18 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
}
}

@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException {
private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,
BytesReference content, boolean lastChunk) throws IOException {
final Store store = store();
final String name = fileMetaData.name();
state().getTranslog().totalOperations(totalTranslogOps);
final RecoveryState.Index indexState = state().getIndex();
IndexOutput indexOutput;
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name);
}
assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position;
BytesRefIterator iterator = content.iterator();
BytesRef scratch;
while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls
Expand All @@ -522,6 +525,59 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
}
}

@Override
public CompletableFuture<Void> writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
return CompletableFuture.completedFuture(null);
}

private static final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}

private final class FileChunkWriter {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
long lastPosition = 0;

void writeChunk(FileChunk newChunk) throws IOException {
synchronized (this) {
pendingChunks.add(newChunk);
}
while (true) {
final FileChunk chunk;
synchronized (this) {
chunk = pendingChunks.peek();
if (chunk == null || chunk.position != lastPosition) {
return;
}
pendingChunks.remove();
}
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
}
}
}
}
}

Path translogLocation() {
return indexShard().shardPath().resolveTranslog();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;


public interface RecoveryTargetHandler {
Expand Down Expand Up @@ -89,7 +90,7 @@ void receiveFileInfo(List<String> phase1FileNames,
void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException;

/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;
CompletableFuture<Void> writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

Expand Down Expand Up @@ -142,8 +146,8 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
}

@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean
lastChunk, int totalTranslogOps) throws IOException {
public CompletableFuture<Void> writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
Expand All @@ -166,14 +170,25 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
throttleTimeInNanos = 0;
}

final CompletableFuture<Void> future = new CompletableFuture<>();
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
totalTranslogOps,
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
throttleTimeInNanos), fileChunkRequestOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
future.complete(null);
}
@Override
public void handleException(TransportException exp) {
future.completeExceptionally(exp);
}
});
return future;
}

}
Loading