Skip to content

Commit 353bcdf

Browse files
Added Changes to support pinning in RefCountedCache
Signed-off-by: Mayank Sharma <smynk@amazon.com>
1 parent 38847e8 commit 353bcdf

File tree

10 files changed

+311
-18
lines changed

10 files changed

+311
-18
lines changed

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,19 +322,15 @@ public String toString() {
322322
*/
323323
public void afterSyncToRemote(String file) {
324324
ensureOpen();
325-
/*
326-
Decrementing the refCount here for the path so that it becomes eligible for eviction
327-
This is a temporary solution until pinning support is added
328-
TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache
329-
Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality
330-
*/
325+
331326
logger.trace(
332327
"Composite Directory[{}]: File {} uploaded to Remote Store and now can be eligible for eviction in FileCache",
333328
this::toString,
334329
() -> file
335330
);
336-
fileCache.decRef(getFilePath(file));
337-
// fileCache.remove(getFilePath(fileName));
331+
final Path filePath = getFilePath(file);
332+
fileCache.unpin(filePath);
333+
fileCache.remove(filePath);
338334
}
339335

340336
// Visibility public since we need it in IT tests
@@ -385,12 +381,9 @@ private String[] getRemoteFiles() throws IOException {
385381

386382
private void cacheFile(String name) throws IOException {
387383
Path filePath = getFilePath(name);
388-
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
389-
// so that it can be evicted after that
390-
// this is just a temporary solution, will pin the file once support for that is added in FileCache
391-
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
392-
// successfully uploaded to Remote
384+
393385
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
386+
fileCache.pin(filePath);
394387
}
395388

396389
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,26 @@ public void decRef(Path key) {
121121
theCache.decRef(key);
122122
}
123123

124+
/**
125+
* Pins the key in the cache, preventing it from being evicted.
126+
*
127+
* @param key
128+
*/
129+
@Override
130+
public void pin(Path key) {
131+
theCache.pin(key);
132+
}
133+
134+
/**
135+
* Unpins the key in the cache, allowing it to be evicted.
136+
*
137+
* @param key
138+
*/
139+
@Override
140+
public void unpin(Path key) {
141+
theCache.unpin(key);
142+
}
143+
124144
@Override
125145
public long prune() {
126146
return theCache.prune();
@@ -141,6 +161,16 @@ public long activeUsage() {
141161
return theCache.activeUsage();
142162
}
143163

164+
/**
165+
* Returns the pinned usage of this cache.
166+
*
167+
* @return the combined pinned weight of the values in this cache.
168+
*/
169+
@Override
170+
public long pinnedUsage() {
171+
return theCache.pinnedUsage();
172+
}
173+
144174
@Override
145175
public CacheStats stats() {
146176
return theCache.stats();

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,18 @@ static class Node<K, V> {
7171

7272
int refCount;
7373

74+
boolean pinned;
75+
7476
Node(K key, V value, long weight) {
7577
this.key = key;
7678
this.value = value;
7779
this.weight = weight;
7880
this.refCount = 0;
81+
this.pinned = false;
7982
}
8083

8184
public boolean evictable() {
82-
return (refCount == 0);
85+
return ((refCount == 0) && (pinned == false));
8386
}
8487
}
8588

@@ -193,6 +196,7 @@ public void clear() {
193196
}
194197
statsCounter.resetUsage();
195198
statsCounter.resetActiveUsage();
199+
statsCounter.resetPinnedUsage();
196200
} finally {
197201
lock.unlock();
198202
}
@@ -252,6 +256,63 @@ public void decRef(K key) {
252256
}
253257
}
254258

259+
/**
260+
* Pins the key in the cache, preventing it from being evicted.
261+
*
262+
* @param key
263+
*/
264+
@Override
265+
public void pin(K key) {
266+
Objects.requireNonNull(key);
267+
lock.lock();
268+
try {
269+
Node<K, V> node = data.get(key);
270+
if (node != null) {
271+
if (node.pinned == false) {
272+
statsCounter.recordPinnedUsage(node.weight, false);
273+
}
274+
275+
if (node.evictable()) {
276+
// since its pinned, we should remove it from eviction list
277+
lru.remove(node.key, node);
278+
}
279+
280+
node.pinned = true;
281+
}
282+
} finally {
283+
lock.unlock();
284+
}
285+
}
286+
287+
/**
288+
* Unpins the key in the cache, allowing it to be evicted.
289+
*
290+
* @param key
291+
*/
292+
@Override
293+
public void unpin(K key) {
294+
Objects.requireNonNull(key);
295+
lock.lock();
296+
297+
try {
298+
Node<K, V> node = data.get(key);
299+
if (node != null && (node.pinned == true)) {
300+
301+
node.pinned = false;
302+
303+
if (node.evictable()) {
304+
// if it becomes evictable, we should add it to eviction list
305+
lru.put(node.key, node);
306+
}
307+
308+
statsCounter.recordPinnedUsage(node.weight, true);
309+
}
310+
311+
} finally {
312+
lock.unlock();
313+
}
314+
}
315+
255316
@Override
256317
public long prune(Predicate<K> keyPredicate) {
257318
long sum = 0L;
@@ -286,7 +347,6 @@ public long usage() {
286347
}
287348

288349
@Override
289-
290350
public long activeUsage() {
291351
lock.lock();
292352
try {
@@ -296,6 +356,21 @@ public long activeUsage() {
296356
}
297357
}
298358

359+
/**
360+
* Returns the pinned usage of this cache.
361+
*
362+
* @return the combined pinned weight of the values in this cache.
363+
*/
364+
@Override
365+
public long pinnedUsage() {
366+
lock.lock();
367+
try {
368+
return statsCounter.pinnedUsage();
369+
} finally {
370+
lock.unlock();
371+
}
372+
}
373+
299374
@Override
300375
public CacheStats stats() {
301376
lock.lock();
@@ -348,7 +423,7 @@ private void replaceNode(Node<K, V> node, V newValue) {
348423
node.weight = newWeight;
349424

350425
// update stats
351-
statsCounter.recordReplacement(oldValue, newValue, oldWeight, newWeight, node.refCount > 0);
426+
statsCounter.recordReplacement(oldValue, newValue, oldWeight, newWeight, node.refCount > 0, node.pinned);
352427
listener.onRemoval(new RemovalNotification<>(node.key, oldValue, RemovalReason.REPLACED));
353428
}
354429
incRef(node.key);
@@ -364,6 +439,11 @@ private void removeNode(K key) {
364439
if (node.evictable()) {
365440
lru.remove(node.key);
366441
}
442+
443+
if (node.pinned) {
444+
statsCounter.recordPinnedUsage(node.weight, true);
445+
}
446+
367447
statsCounter.recordRemoval(node.value, node.weight);
368448
listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT));
369449
}

server/src/main/java/org/opensearch/index/store/remote/utils/cache/RefCountedCache.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ public interface RefCountedCache<K, V> {
7676
*/
7777
void decRef(K key);
7878

79+
/**
80+
* Pins the key in the cache, preventing it from being evicted.
81+
* @param key
82+
*/
83+
void pin(K key);
84+
85+
/**
86+
* Unpins the key in the cache, allowing it to be evicted.
87+
* @param key
88+
*/
89+
void unpin(K key);
90+
7991
/**
8092
* Removes all cache entries with a reference count of zero, regardless of current capacity.
8193
*
@@ -107,6 +119,13 @@ default long prune() {
107119
*/
108120
long activeUsage();
109121

122+
/**
123+
* Returns the pinned usage of this cache.
124+
*
125+
* @return the combined pinned weight of the values in this cache.
126+
*/
127+
long pinnedUsage();
128+
110129
/**
111130
* Returns a current snapshot of this cache's cumulative statistics. All statistics are
112131
* initialized to zero, and are monotonically increasing over the lifetime of the cache.

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,18 @@ public void decRef(K key) {
133133
segmentFor(key).decRef(key);
134134
}
135135

136+
@Override
137+
public void pin(K key) {
138+
if (key == null) throw new NullPointerException();
139+
segmentFor(key).pin(key);
140+
}
141+
142+
@Override
143+
public void unpin(K key) {
144+
if (key == null) throw new NullPointerException();
145+
segmentFor(key).unpin(key);
146+
}
147+
136148
@Override
137149
public long prune() {
138150
long sum = 0L;
@@ -172,6 +184,21 @@ public long activeUsage() {
172184
return totalActiveUsage;
173185
}
174186

187+
/**
188+
* Returns the pinned usage of this cache.
189+
*
190+
* @return the combined pinned weight of the values in this cache.
191+
*/
192+
@Override
193+
public long pinnedUsage() {
194+
long totalPinnedUsage = 0L;
195+
for (RefCountedCache<K, V> cache : table) {
196+
CacheStats c = cache.stats();
197+
totalPinnedUsage += c.pinnedUsage();
198+
}
199+
return totalPinnedUsage;
200+
}
201+
175202
@Override
176203
public CacheStats stats() {
177204

@@ -184,6 +211,7 @@ public CacheStats stats() {
184211
long totalEvictionWeight = 0L;
185212
long totalUsage = 0L;
186213
long totalActiveUsage = 0L;
214+
long totalPinnedUsage = 0L;
187215

188216
// full file counts
189217
long totalFullFileHitCount = 0L;
@@ -206,6 +234,7 @@ public CacheStats stats() {
206234
totalEvictionWeight += c.evictionWeight();
207235
totalUsage += c.usage();
208236
totalActiveUsage += c.activeUsage();
237+
totalPinnedUsage += c.pinnedUsage();
209238

210239
CacheStats.FullFileStats fullFileStats = c.fullFileStats();
211240

@@ -229,6 +258,7 @@ public CacheStats stats() {
229258
totalEvictionWeight,
230259
totalUsage,
231260
totalActiveUsage,
261+
totalPinnedUsage,
232262
totalFullFileHitCount,
233263
totalFullFileRemoveCount,
234264
totalFullFileRemoveWeight,

server/src/main/java/org/opensearch/index/store/remote/utils/cache/stats/CacheStats.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public final class CacheStats {
3030
private final long evictionWeight;
3131
private final long usage;
3232
private final long activeUsage;
33+
private final long pinnedUsage;
3334
private final FullFileStats fullFileStats;
3435

3536
/**
@@ -191,6 +192,7 @@ public CacheStats(
191192
long evictionWeight,
192193
long usage,
193194
long activeUsage,
195+
long pinnedUsage,
194196
long fullFileHitCount,
195197
long fullFileRemoveCount,
196198
long fullFileRemoveWeight,
@@ -200,6 +202,7 @@ public CacheStats(
200202
long fullFileUsage,
201203
long fullFileActiveUsage
202204
) {
205+
203206
if ((hitCount < 0)
204207
|| (missCount < 0)
205208
|| (removeCount < 0)
@@ -218,6 +221,7 @@ public CacheStats(
218221
this.evictionWeight = evictionWeight;
219222
this.usage = usage;
220223
this.activeUsage = activeUsage;
224+
this.pinnedUsage = pinnedUsage;
221225
this.fullFileStats = new FullFileStats(
222226
fullFileHitCount,
223227
fullFileRemoveCount,
@@ -355,6 +359,15 @@ public long activeUsage() {
355359
return activeUsage;
356360
}
357361

362+
/**
363+
* Returns the total pinned weight of the cache.
364+
*
365+
* @return the total pinned weight of the cache
366+
*/
367+
public long pinnedUsage() {
368+
return pinnedUsage;
369+
}
370+
358371
/**
359372
* Returns full file stats for the cache.
360373
* @return
@@ -375,6 +388,7 @@ public int hashCode() {
375388
evictionWeight,
376389
usage,
377390
activeUsage,
391+
pinnedUsage,
378392
fullFileStats
379393
);
380394
}
@@ -396,6 +410,7 @@ public boolean equals(Object o) {
396410
&& evictionWeight == other.evictionWeight
397411
&& usage == other.usage
398412
&& activeUsage == other.activeUsage
413+
&& pinnedUsage == other.pinnedUsage
399414
&& fullFileStats.equals(other.fullFileStats);
400415
}
401416

0 commit comments

Comments
 (0)