Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.ToStringBuilder;
Expand All @@ -29,10 +32,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -108,10 +112,16 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
shuffleIndexCache = Caffeine.newBuilder()
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
.build(ShuffleIndexInformation::new);
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
Expand Down Expand Up @@ -303,7 +313,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (CompletionException e) {
} catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -121,10 +122,16 @@ public RemoteBlockPushResolver(TransportConf conf) {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
indexCache = Caffeine.newBuilder()
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
.build(ShuffleIndexInformation::new);
.build(indexCacheLoader);
}

@VisibleForTesting
Expand Down Expand Up @@ -315,7 +322,7 @@ public ManagedBuffer getMergedBlockData(
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
} catch (CompletionException e) {
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
}
Expand Down
12 changes: 0 additions & 12 deletions core/benchmarks/LocalCacheBenchmark-jdk11-results.txt

This file was deleted.

12 changes: 0 additions & 12 deletions core/benchmarks/LocalCacheBenchmark-results.txt

This file was deleted.

4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.concurrent
import scala.collection.mutable
import scala.util.Properties

import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -77,7 +77,7 @@ class SparkEnv (
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata =
Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap()
CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()

private[spark] var driverTmpDir: Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.spark.deploy.history

import java.util.NoSuchElementException
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutionException
import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.collection.JavaConverters._

import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
import com.google.common.util.concurrent.UncheckedExecutionException
import org.eclipse.jetty.servlet.FilterHolder

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -61,27 +62,21 @@ private[history] class ApplicationCache(

/**
* Removal event notifies the provider to detach the UI.
* @param key removal key
* @param value removal value
* @param cause the reason why a `CacheEntry` was removed, it should
* always be `SIZE` because `appCache` configured with
* `maximumSize` eviction strategy
* @param rm removal notification
*/
override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = {
override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
metrics.evictionCount.inc()
logDebug(s"Evicting entry $key")
operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
val key = rm.getKey
logDebug(s"Evicting entry ${key}")
operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui)
}
}

private val appCache: LoadingCache[CacheKey, CacheEntry] = {
val builder = Caffeine.newBuilder()
.maximumSize(retainedApplications)
.removalListener(removalListener)
// SPARK-34309: Use custom Executor to compatible with
// the data eviction behavior of Guava cache
.executor((command: Runnable) => command.run())
builder.build[CacheKey, CacheEntry](appLoader)
CacheBuilder.newBuilder()
.maximumSize(retainedApplications)
.removalListener(removalListener)
.build(appLoader)
}

/**
Expand All @@ -91,9 +86,9 @@ private[history] class ApplicationCache(

def get(appId: String, attemptId: Option[String] = None): CacheEntry = {
try {
appCache.get(CacheKey(appId, attemptId))
appCache.get(new CacheKey(appId, attemptId))
} catch {
case e @ (_: CompletionException | _: RuntimeException) =>
case e @ (_: ExecutionException | _: UncheckedExecutionException) =>
throw Option(e.getCause()).getOrElse(e)
}
}
Expand Down Expand Up @@ -132,7 +127,7 @@ private[history] class ApplicationCache(
}

/** @return Number of cached UIs. */
def size(): Long = appCache.estimatedSize()
def size(): Long = appCache.size()

private def time[T](t: Timer)(f: => T): T = {
val timeCtx = t.time()
Expand Down Expand Up @@ -202,7 +197,7 @@ private[history] class ApplicationCache(
val sb = new StringBuilder(s"ApplicationCache(" +
s" retainedApplications= $retainedApplications)")
sb.append(s"; time= ${clock.getTimeMillis()}")
sb.append(s"; entry count= ${appCache.estimatedSize()}\n")
sb.append(s"; entry count= ${appCache.size()}\n")
sb.append("----\n")
appCache.asMap().asScala.foreach {
case(key, entry) => sb.append(s" $key -> $entry\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.hadoop.fs.Path

import org.apache.spark._
Expand Down Expand Up @@ -85,18 +85,16 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
}

// Cache of preferred locations of checkpointed files.
@transient private[spark] lazy val cachedPreferredLocations = {
val builder = Caffeine.newBuilder()
.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
TimeUnit.MINUTES)
val loader = new CacheLoader[Partition, Seq[String]]() {
override def load(split: Partition): Seq[String] = {
getPartitionBlockLocations(split)
}
}
builder.build[Partition, Seq[String]](loader)
}
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
TimeUnit.MINUTES)
.build(
new CacheLoader[Partition, Seq[String]]() {
override def load(split: Partition): Seq[String] = {
getPartitionBlockLocations(split)
}
})

// Returns the block locations of given partition on file system.
private def getPartitionBlockLocations(split: Partition): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
import scala.util.control.NonFatal

import com.codahale.metrics.{MetricRegistry, MetricSet}
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.cache.CacheBuilder

import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
Expand Down Expand Up @@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager(
blockStoreClient: BlockStoreClient) extends Logging {

private val executorIdToLocalDirsCache =
Caffeine
CacheBuilder
.newBuilder()
.maximumSize(cacheSize)
.build[String, Array[String]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.google.common.cache.{CacheBuilder, CacheLoader}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -136,14 +136,11 @@ private[spark] object BlockManagerId {
* The max cache size is hardcoded to 10000, since the size of a BlockManagerId
* object is about 48B, the total memory cost should be below 1MB which is feasible.
*/
val blockManagerIdCache = {
Caffeine.newBuilder()
.maximumSize(10000)
.build[BlockManagerId, BlockManagerId](
new CacheLoader[BlockManagerId, BlockManagerId]() {
override def load(id: BlockManagerId): BlockManagerId = id
})
}
val blockManagerIdCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.build(new CacheLoader[BlockManagerId, BlockManagerId]() {
override def load(id: BlockManagerId) = id
})

def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal

import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.cache.CacheBuilder

import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint(

// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
Caffeine
CacheBuilder
.newBuilder()
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
.build[String, Array[String]]()
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import scala.util.matching.Regex

import _root_.io.netty.channel.unix.Errors.NativeIoException
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
Expand Down Expand Up @@ -1616,16 +1616,13 @@ private[spark] object Utils extends Logging {
if (compressedLogFileLengthCache == null) {
val compressedLogFileLengthCacheSize = sparkConf.get(
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
compressedLogFileLengthCache = {
val builder = Caffeine.newBuilder()
.maximumSize(compressedLogFileLengthCacheSize)
builder.build[String, java.lang.Long](
new CacheLoader[String, java.lang.Long]() {
override def load(path: String): java.lang.Long = {
Utils.getCompressedFileLength(new File(path))
}
})
}
compressedLogFileLengthCache = CacheBuilder.newBuilder()
.maximumSize(compressedLogFileLengthCacheSize)
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
override def load(path: String): java.lang.Long = {
Utils.getCompressedFileLength(new File(path))
}
})
}
compressedLogFileLengthCache
}
Expand Down
Loading