Skip to content

Commit

Permalink
use atomic long to count cache files
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Nov 17, 2020
1 parent f69dde9 commit 395845d
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class CacheService extends AbstractLifecycleComponent {

private final ThreadPool threadPool;
private final ConcurrentLinkedQueue<CacheFile> cacheFilesToSync;
private final AtomicLong numberOfCacheFilesToSync;
private final CacheSynchronizationTask cacheSyncTask;
private final Cache<CacheKey, CacheFile> cache;
private final ByteSizeValue cacheSize;
Expand All @@ -117,6 +119,7 @@ public CacheService(
// are done with reading/writing the cache file
.removalListener(notification -> onCacheFileRemoval(notification.getValue()))
.build();
this.numberOfCacheFilesToSync = new AtomicLong();
this.cacheFilesToSync = new ConcurrentLinkedQueue<>();
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.maxCacheFilesToSyncAtOnce = SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING.get(settings);
Expand Down Expand Up @@ -218,6 +221,7 @@ private void setMaxCacheFilesToSyncAtOnce(int maxCacheFilesToSyncAtOnce) {
void onCacheFileUpdate(CacheFile cacheFile) {
assert cacheFile != null;
cacheFilesToSync.offer(cacheFile);
numberOfCacheFilesToSync.incrementAndGet();
}

/**
Expand Down Expand Up @@ -251,7 +255,7 @@ protected void synchronizeCache() {
long count = 0L;
final Set<Path> cacheDirs = new HashSet<>();
final long startTimeNanos = threadPool.relativeTimeInNanos();
final int maxCacheFilesToSync = Math.min(cacheFilesToSync.size(), this.maxCacheFilesToSyncAtOnce);
final long maxCacheFilesToSync = Math.min(numberOfCacheFilesToSync.get(), this.maxCacheFilesToSyncAtOnce);
for (long i = 0L; i < maxCacheFilesToSync; i++) {
if (lifecycleState() != Lifecycle.State.STARTED) {
logger.debug("stopping cache synchronization (cache service is closing)");
Expand All @@ -262,6 +266,8 @@ protected void synchronizeCache() {
logger.debug("stopping cache synchronization (no more cache files to fsync)");
break;
}
final long value = numberOfCacheFilesToSync.decrementAndGet();
assert value >= 0 : value;
final Path cacheFilePath = cacheFile.getFile();
try {
final SortedSet<Tuple<Long, Long>> ranges = cacheFile.fsync();
Expand Down

0 comments on commit 395845d

Please sign in to comment.