Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CacheFile#fsync() method to ensure cached data are written on disk #64201

Merged
merged 9 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,21 @@ public FileVisitResult visitFileFailed(final Path file, final IOException exc) t
* systems and operating systems allow to fsync on a directory)
*/
public static void fsync(final Path fileToSync, final boolean isDir) throws IOException {
fsync(fileToSync, isDir, true);
}

/**
* Ensure that any writes to the given file is written to the storage device that contains it. The {@code isDir} parameter specifies
* whether or not the path to sync is a directory. This is needed because we open for read and ignore an {@link IOException} since not
* all filesystems and operating systems support fsyncing on a directory. For regular files we must open for write for the fsync to have
* an effect.
*
* @param fileToSync the file to fsync
* @param isDir if true, the given file is a directory (we open for read and ignore {@link IOException}s, because not all file
* systems and operating systems allow to fsync on a directory)
* @param metaData if {@code true} both the file's content and metadata will be sync, otherwise only the file's content will be sync
*/
public static void fsync(final Path fileToSync, final boolean isDir, final boolean metaData) throws IOException {
if (isDir && WINDOWS) {
// opening a directory on Windows fails, directories can not be fsynced there
if (Files.exists(fileToSync) == false) {
Expand All @@ -290,7 +305,7 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx
}
try (FileChannel file = FileChannel.open(fileToSync, isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)) {
try {
file.force(true);
file.force(metaData);
} catch (final IOException e) {
if (isDir) {
assert (LINUX || MAC_OS_X) == false :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ public void delete(final Path path) throws IOException {

}

private void fsync(final Path path, final boolean isDir) throws IOException {
IOUtils.fsync(path, isDir, randomBoolean());
}

public void testFsyncDirectory() throws Exception {
final Path path = createTempDir().toRealPath();
final Path subPath = path.resolve(randomAlphaOfLength(8));
Files.createDirectories(subPath);
IOUtils.fsync(subPath, true);
fsync(subPath, true);
// no exception
}

Expand Down Expand Up @@ -246,16 +250,16 @@ public void testFsyncAccessDeniedOpeningDirectory() throws Exception {
final Path wrapped = new FilterPath(path, fs);
if (Constants.WINDOWS) {
// no exception, we early return and do not even try to open the directory
IOUtils.fsync(wrapped, true);
fsync(wrapped, true);
} else {
expectThrows(AccessDeniedException.class, () -> IOUtils.fsync(wrapped, true));
expectThrows(AccessDeniedException.class, () -> fsync(wrapped, true));
}
}

public void testFsyncNonExistentDirectory() throws Exception {
final Path dir = FilterPath.unwrap(createTempDir()).toRealPath();
final Path nonExistentDir = dir.resolve("non-existent");
expectThrows(NoSuchFileException.class, () -> IOUtils.fsync(nonExistentDir, true));
expectThrows(NoSuchFileException.class, () -> fsync(nonExistentDir, true));
}

public void testFsyncFile() throws IOException {
Expand All @@ -266,7 +270,7 @@ public void testFsyncFile() throws IOException {
try (OutputStream o = Files.newOutputStream(file)) {
o.write("0\n".getBytes(StandardCharsets.US_ASCII));
}
IOUtils.fsync(file, false);
fsync(file, false);
// no exception
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -67,6 +70,13 @@ protected void closeInternal() {

private final Set<EvictionListener> listeners = new HashSet<>();

/**
* Indicates whether the cache file requires to be synchronized with the storage device that contains it in order to persist in a
* durable manner its ranges of cached data. An empty cache file does not need to be fsync; and writing new data to the cache file
* will toggle the flag to {@code true}.
**/
private final AtomicBoolean needsFsync = new AtomicBoolean();

/**
* A reference counted holder for the current channel to the physical file backing this cache file instance.
* By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for
Expand Down Expand Up @@ -310,6 +320,7 @@ protected void doRun() throws Exception {
reference.decRef();
}
gap.onCompletion();
needsFsync.set(true);
}

@Override
Expand Down Expand Up @@ -403,4 +414,51 @@ public Tuple<Long, Long> getAbsentRangeWithin(long start, long end) {
ensureOpen();
return tracker.getAbsentRangeWithin(start, end);
}

// used in tests
boolean needsFsync() {
return needsFsync.get();
}

/**
* Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method performs
* synchronization only if data has been written to the cache since the last time the method was called. If calling this method
* resulted in performing a synchronization, a sorted set of all successfully written ranges of data since the creation of the cache
* file is returned. If the cache file is evicted or if a concurrent thread is already fsyncing the file this method returns an empty
* set of ranges.
*
* @return a sorted set of ranges of data available in cache iff calling this method resulted in performing a fsync
* @throws IOException if the cache file failed to be fsync
* @throws java.nio.file.NoSuchFileException if the cache file does not exist
*/
public SortedSet<Tuple<Long, Long>> fsync() throws IOException {
if (refCounter.tryIncRef()) {
try {
if (needsFsync.compareAndSet(true, false)) {
boolean success = false;
try {
// Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as
// persisted on disk by the caller of this method, even if they are fully written to disk at the time the file
// fsync is effectively executed
final SortedSet<Tuple<Long, Long>> completedRanges = tracker.getCompletedRanges();
assert completedRanges != null;
assert completedRanges.isEmpty() == false;

IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory
success = true;
return completedRanges;
} finally {
if (success == false) {
needsFsync.set(true);
}
}
}
} finally {
refCounter.decRef();
}
} else {
assert evicted.get();
}
return Collections.emptySortedSet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ public long getLength() {
return length;
}

public List<Tuple<Long, Long>> getCompletedRanges() {
List<Tuple<Long, Long>> completedRanges = null;
public SortedSet<Tuple<Long, Long>> getCompletedRanges() {
SortedSet<Tuple<Long, Long>> completedRanges = null;
synchronized (mutex) {
assert invariant();
for (Range range : ranges) {
if (range.isPending()) {
continue;
}
if (completedRanges == null) {
completedRanges = new ArrayList<>();
completedRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1));
}
completedRanges.add(Tuple.tuple(range.start, range.end));
}
}
return completedRanges == null ? Collections.emptyList() : completedRanges;
return completedRanges == null ? Collections.emptySortedSet() : completedRanges;
}

/**
Expand Down
Loading