Skip to content

Commit c071c2e

Browse files
committed
Revert "Fix test failures"
This reverts commit e280fe6. Revert "Change compute to put to prevent unnecessary cache misses" This reverts commit c4f8e86. Revert "Fix fetchBlob in TransferManager to compute from file cache only if entry is not present" This reverts commit bf175e7. Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 9281079 commit c071c2e

File tree

2 files changed

+18
-32
lines changed

2 files changed

+18
-32
lines changed

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,17 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
7474

7575
try {
7676
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
77-
CachedIndexInput cacheEntry = fileCache.get(key);
78-
if (cacheEntry == null || cacheEntry.isClosed()) {
79-
cacheEntry = new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
80-
fileCache.put(key, cacheEntry);
81-
}
77+
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
78+
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
79+
logger.trace("Transfer Manager - IndexInput closed or not in cache");
80+
// Doesn't exist or is closed, either way create a new one
81+
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
82+
} else {
83+
logger.trace("Transfer Manager - Already in cache");
84+
// already in the cache and ready to be used (open)
85+
return cachedIndexInput;
86+
}
87+
});
8288

8389
// Cache entry was either retrieved from the cache or newly added, either
8490
// way the reference count has been incremented by one. We can only

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import org.apache.lucene.store.IndexInput;
1414
import org.apache.lucene.store.MMapDirectory;
1515
import org.apache.lucene.store.SimpleFSLockFactory;
16-
import org.opensearch.common.breaker.TestCircuitBreaker;
17-
import org.opensearch.core.common.breaker.CircuitBreakingException;
16+
import org.opensearch.core.common.breaker.CircuitBreaker;
17+
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
1818
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
1919
import org.opensearch.index.store.remote.filecache.FileCache;
2020
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
@@ -38,8 +38,11 @@
3838
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3939
public abstract class TransferManagerTestCase extends OpenSearchTestCase {
4040
protected static final int EIGHT_MB = 1024 * 1024 * 8;
41-
protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
42-
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker);
41+
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
42+
EIGHT_MB * 2,
43+
1,
44+
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
45+
);
4346
protected MMapDirectory directory;
4447
protected TransferManager transferManager;
4548

@@ -153,29 +156,6 @@ public void testDownloadFails() throws Exception {
153156
MatcherAssert.assertThat(fileCache.usage(), equalTo(0L));
154157
}
155158

156-
public void testCircuitBreakerWhileDownloading() throws IOException {
157-
// fetch blob when circuit breaking is not tripping
158-
try (IndexInput i = fetchBlobWithName("1")) {
159-
assertIndexInputIsFunctional(i);
160-
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
161-
}
162-
// should have entry in file cache
163-
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
164-
MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB));
165-
166-
// start tripping the circuit breaker
167-
testCircuitBreaker.startBreaking();
168-
169-
// fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions
170-
try (IndexInput i = fetchBlobWithName("1")) {
171-
assertIndexInputIsFunctional(i);
172-
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
173-
}
174-
175-
// fetch new blob - should encounter circuit breaking exception
176-
expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2"));
177-
}
178-
179159
public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception {
180160
// Mock a call for a blob that will block until the latch is released,
181161
// then start the fetch for that blob on a separate thread

0 commit comments

Comments
 (0)