diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index bf8c6ae0ab31..9f600e077e15 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.commons.lang3.tuple.Pair; @@ -112,6 +113,12 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false")); this.registeredExecutorFile = registeredExecutorFile; String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); + long indexCacheSizeBytes = JavaUtils.byteStringAsBytes(indexCacheSize); + // DEFAULT_CONCURRENCY_LEVEL is 4 and if indexCacheSizeBytes > 8g bytes(8589934592L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + Preconditions.checkArgument(indexCacheSizeBytes <= 8589934592L, + "The value of 'spark.shuffle.service.index.cache.size' shouldn't " + + "exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -119,7 +126,7 @@ public ShuffleIndexInformation load(File file) throws IOException { } }; shuffleIndexCache = CacheBuilder.newBuilder() - .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) + .maximumWeight(indexCacheSizeBytes) .weigher((Weigher) (file, indexInfo) -> indexInfo.getSize()) .build(indexCacheLoader); db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index d0eb4aed6593..6bea818acf8e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -122,6 +122,12 @@ public RemoteBlockPushResolver(TransportConf conf) { NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); + long mergedIndexCacheSize = conf.mergedIndexCacheSize(); + // DEFAULT_CONCURRENCY_LEVEL is 4 and if mergedIndexCacheSize > 8g bytes(8589934592L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + Preconditions.checkArgument(mergedIndexCacheSize <= 8589934592L, + "The value of 'spark.shuffle.push.server.mergedIndexCacheSize' shouldn't " + + "exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -129,7 +135,7 @@ public ShuffleIndexInformation load(File file) throws IOException { } }; indexCache = CacheBuilder.newBuilder() - .maximumWeight(conf.mergedIndexCacheSize()) + .maximumWeight(mergedIndexCacheSize) .weigher((Weigher)(file, indexInfo) -> indexInfo.getSize()) .build(indexCacheLoader); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index b5d800f02862..441f0b87b5d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SizeEstimator @@ -107,6 +108,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends // than the size of one [[FileStatus]]). // so it will support objects up to 64GB in size. val weightScale = 32 + val maximumWeight = maxSizeInBytes / weightScale + // DEFAULT_CONCURRENCY_LEVEL is 4, weightScale is 32, + // if maxSizeInBytes > 256g bytes(274877906944L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + assert(maximumWeight <= 274877906944L, + s"The value of '${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' shouldn't " + + "exceed 256g bytes due to Guava#1761") val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale @@ -136,7 +144,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) - .maximumWeight(maxSizeInBytes / weightScale) + .maximumWeight(maximumWeight) if (cacheTTL > 0) { builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)