Skip to content

Commit

Permalink
Fix race with eviction when reading from FileCache
Browse files Browse the repository at this point in the history
The previous implementation had an inherent race condition where a
zero-reference count IndexInput read from the cache could be evicted
before the IndexInput was cloned (and therefore had its reference count
incremented). Since the IndexInputs are stateful this is very bad. The
least-recently-used semantics meant that in a properly-configured system
this would be unlikely since accessing a zero-reference count item would
move it to be most-recently used and therefore least likely to be
evicted. However, there was still a latent bug that was possible to
encounter (see issue #6295).

The only way to fix this, as far as I can see, is to change the cache
behavior so that fetching an item from the cache atomically
increments its reference count. This also led to a change to
TransferManager to ensure that all requests for an item ultimately read
through the cache to eliminate any possibility of a race. I have
implement some concurrent unit tests that put the cache into a
worst-case thrashing scenario to ensure that concurrent access never
closes an IndexInput while it is still being used.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Mar 11, 2023
1 parent ef3eef0 commit a177172
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 595 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
.directory(directory)
.fileName(blockFileName)
.build();
try {
return transferManager.fetchBlob(blobFetchRequest);
} catch (InterruptedException e) {
logger.error("Interrupted while fetching [{}]", blobFetchRequest);
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return transferManager.fetchBlob(blobFetchRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
Expand Down Expand Up @@ -52,21 +51,17 @@ public long capacity() {
return theCache.capacity();
}

@Override
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
return theCache.put(filePath, indexInput);
}

@Override
public void putAll(Map<? extends Path, ? extends CachedIndexInput> m) {
theCache.putAll(m);
}

@Override
public CachedIndexInput computeIfPresent(
public CachedIndexInput compute(
Path key,
BiFunction<? super Path, ? super CachedIndexInput, ? extends CachedIndexInput> remappingFunction
) {
return theCache.computeIfPresent(key, remappingFunction);
return theCache.compute(key, remappingFunction);
}

/**
Expand All @@ -91,11 +86,6 @@ public void remove(final Path filePath) {
theCache.remove(filePath);
}

@Override
public void removeAll(Iterable<? extends Path> keys) {
theCache.removeAll(keys);
}

@Override
public void clear() {
theCache.clear();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

/**
* This acts as entry point to fetch {@link BlobFetchRequest} and return actual {@link IndexInput}. Utilizes the BlobContainer interface to
Expand All @@ -34,12 +33,10 @@ public class TransferManager {
private static final Logger logger = LogManager.getLogger(TransferManager.class);

private final BlobContainer blobContainer;
private final ConcurrentInvocationLinearizer<Path, IndexInput> invocationLinearizer;
private final FileCache fileCache;

public TransferManager(final BlobContainer blobContainer, final FileCache fileCache) {
this.blobContainer = blobContainer;
this.invocationLinearizer = new ConcurrentInvocationLinearizer<>();
this.fileCache = fileCache;
}

Expand All @@ -48,53 +45,46 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws InterruptedException, IOException {
final IndexInput indexInput = invocationLinearizer.linearize(
blobFetchRequest.getFilePath(),
p -> fetchOriginBlob(blobFetchRequest)
);
return indexInput.clone();
}
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
final Path key = blobFetchRequest.getFilePath();

/**
* Fetches the "origin" IndexInput from the cache, downloading it first if it is
* not already cached. This instance must be cloned before using. This method is
* accessed through the ConcurrentInvocationLinearizer so read-check-write is
* acceptable here
*/
private IndexInput fetchOriginBlob(BlobFetchRequest blobFetchRequest) throws IOException {
// check if the origin is already in block cache
IndexInput origin = fileCache.computeIfPresent(blobFetchRequest.getFilePath(), (path, cachedIndexInput) -> {
if (cachedIndexInput.isClosed()) {
// if it's already in the file cache, but closed, open it and replace the original one
final IndexInput origin = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null) {
try {
IndexInput luceneIndexInput = blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), luceneIndexInput);
} catch (IOException ioe) {
logger.warn("Open index input " + blobFetchRequest.getFilePath() + " got error ", ioe);
// open failed so return null to download the file again
return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), downloadBlockLocally(blobFetchRequest));
} catch (IOException e) {
logger.warn("Failed to download " + blobFetchRequest.getFilePath(), e);
return null;
}

} else {
if (cachedIndexInput.isClosed()) {
// if it's already in the file cache, but closed, open it and replace the original one
try {
final IndexInput luceneIndexInput = blobFetchRequest.getDirectory()
.openInput(blobFetchRequest.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), luceneIndexInput);
} catch (IOException e) {
logger.warn("Failed to open existing file for " + blobFetchRequest.getFilePath(), e);
return null;
}
}
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
// already in the cache and ready to be used (open)
return cachedIndexInput;
});

if (Objects.isNull(origin)) {
// origin is not in file cache, download origin

// open new origin
IndexInput downloaded = downloadBlockLocally(blobFetchRequest);

// refcount = 0 at the beginning
FileCachedIndexInput newOrigin = new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), downloaded);
if (origin == null) {
throw new IOException("Failed to create IndexInput for " + blobFetchRequest.getFileName());
}

// put origin into file cache
fileCache.put(blobFetchRequest.getFilePath(), newOrigin);
origin = newOrigin;
// Origin was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return origin.clone();
} finally {
fileCache.decRef(key);
}
return origin;
}

private IndexInput downloadBlockLocally(BlobFetchRequest blobFetchRequest) throws IOException {
Expand Down
Loading

0 comments on commit a177172

Please sign in to comment.