Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter CCR concurrent file chunk fetching #38841

Merged
merged 4 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -376,16 +374,6 @@ void restoreFiles() throws IOException {
restore(snapshotFiles);
}

private static class FileSession {
FileSession(long lastTrackedSeqNo, long lastOffset) {
this.lastTrackedSeqNo = lastTrackedSeqNo;
this.lastOffset = lastOffset;
}

final long lastTrackedSeqNo;
final long lastOffset;
}

@Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
Expand All @@ -395,118 +383,63 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();

final ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
final Map<FileInfo, FileSession> inFlightRequests = new HashMap<>();
final Object mutex = new Object();

while (true) {
if (error.get() != null) {
break;
}
final FileInfo fileToRecover;
final FileSession prevFileSession;
synchronized (mutex) {
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
break;
}
final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks();
if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) {
for (int i = 0; i < maxConcurrentFileChunks; i++) {
if (remainingFiles.isEmpty()) {
break;
}
inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0));
}
}
final Map.Entry<FileInfo, FileSession> minEntry =
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
prevFileSession = minEntry.getValue();
fileToRecover = minEntry.getKey();
}
try {
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
final FileSession fileSession;
synchronized (mutex) {
fileSession = inFlightRequests.get(fileToRecover);
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
// on the next one
if (fileSession == null) {
continue;
}
}
for (FileInfo fileInfo : filesToRecover) {
final long fileLength = fileInfo.length();
long offset = 0;
while (offset < fileLength && error.get() == null) {
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
synchronized (mutex) {
inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset));
requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());

if (error.get() != null) {
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
break;
}
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(),
fileToRecover.length() - fileSession.lastOffset));

final int bytesRequested = Math.toIntExact(
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
offset += bytesRequested;

final GetCcrRestoreFileChunkRequest request =
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
fileInfo.name(), offset, bytesRequested);

TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
r -> threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}

@Override
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final long newOffset = r.getOffset() + actualChunkSize;

assert r.getOffset() == fileSession.lastOffset;
assert actualChunkSize == bytesRequested;
assert newOffset <= fileToRecover.length();
final boolean lastChunk = newOffset >= fileToRecover.length();
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
lastChunk);
if (lastChunk) {
synchronized (mutex) {
final FileSession removed = inFlightRequests.remove(fileToRecover);
assert removed != null : "session disappeared for " + fileToRecover.name();
assert removed.lastTrackedSeqNo == requestSeqId;
assert removed.lastOffset == fileSession.lastOffset;
}
} else {
synchronized (mutex) {
final FileSession replaced = inFlightRequests.replace(fileToRecover,
new FileSession(requestSeqId, newOffset));
assert replaced != null : "session disappeared for " + fileToRecover.name();
assert replaced.lastTrackedSeqNo == requestSeqId;
assert replaced.lastOffset == fileSession.lastOffset;
}
}
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
}),
e -> {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
throw e;
}
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
break;
}

}

try {
requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ private Store.MetadataSnapshot getMetaData() throws IOException {
}

private long readFileBytes(String fileName, BytesReference reference) throws IOException {
Releasable lock = keyedLock.tryAcquire(fileName);
if (lock == null) {
throw new IllegalStateException("can't read from the same file on the same session concurrently");
}
try (Releasable releasable = lock) {
try (Releasable ignored = keyedLock.acquire(fileName)) {
final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> {
try {
return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);
Expand Down