Skip to content

Commit bf175e7

Browse files
committed
Fix fetchBlob in TransferManager to compute from file cache only if entry is not present
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 0874e58 commit bf175e7

File tree

2 files changed

+34
-18
lines changed

2 files changed

+34
-18
lines changed

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,13 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
7474

7575
try {
7676
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
77-
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
78-
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
79-
logger.trace("Transfer Manager - IndexInput closed or not in cache");
80-
// Doesn't exist or is closed, either way create a new one
81-
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
82-
} else {
83-
logger.trace("Transfer Manager - Already in cache");
84-
// already in the cache and ready to be used (open)
85-
return cachedIndexInput;
86-
}
87-
});
77+
CachedIndexInput cacheEntry = fileCache.get(key);
78+
if (cacheEntry == null || cacheEntry.isClosed()) {
79+
cacheEntry = fileCache.compute(
80+
key,
81+
(path, cachedIndexInput) -> new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest)
82+
);
83+
}
8884

8985
// Cache entry was either retrieved from the cache or newly added, either
9086
// way the reference count has been incremented by one. We can only

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import org.apache.lucene.store.IndexInput;
1414
import org.apache.lucene.store.MMapDirectory;
1515
import org.apache.lucene.store.SimpleFSLockFactory;
16-
import org.opensearch.core.common.breaker.CircuitBreaker;
17-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
16+
import org.opensearch.common.breaker.TestCircuitBreaker;
17+
import org.opensearch.core.common.breaker.CircuitBreakingException;
1818
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
1919
import org.opensearch.index.store.remote.filecache.FileCache;
2020
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
@@ -38,11 +38,8 @@
3838
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3939
public abstract class TransferManagerTestCase extends OpenSearchTestCase {
4040
protected static final int EIGHT_MB = 1024 * 1024 * 8;
41-
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
42-
EIGHT_MB * 2,
43-
1,
44-
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
45-
);
41+
protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
42+
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker);
4643
protected MMapDirectory directory;
4744
protected TransferManager transferManager;
4845

@@ -156,6 +153,29 @@ public void testDownloadFails() throws Exception {
156153
MatcherAssert.assertThat(fileCache.usage(), equalTo(0L));
157154
}
158155

156+
public void testCircuitBreakerWhileDownloading() throws IOException {
157+
// fetch blob when circuit breaking is not tripping
158+
try (IndexInput i = fetchBlobWithName("1")) {
159+
assertIndexInputIsFunctional(i);
160+
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
161+
}
162+
// should have entry in file cache
163+
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
164+
MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB));
165+
166+
// start tripping the circuit breaker
167+
testCircuitBreaker.startBreaking();
168+
169+
// fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions
170+
try (IndexInput i = fetchBlobWithName("1")) {
171+
assertIndexInputIsFunctional(i);
172+
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
173+
}
174+
175+
// fetch new blob - should encounter circuit breaking exception
176+
expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2"));
177+
}
178+
159179
public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception {
160180
// Mock a call for a blob that will block until the latch is released,
161181
// then start the fetch for that blob on a separate thread

0 commit comments

Comments
 (0)