Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -112,14 +113,20 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
long indexCacheSizeBytes = JavaUtils.byteStringAsBytes(indexCacheSize);
// DEFAULT_CONCURRENCY_LEVEL is 4 and if indexCacheSizeBytes > 8g bytes(8589934592L),
// maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761.
Preconditions.checkArgument(indexCacheSizeBytes <= 8589934592L,
"The value of 'spark.shuffle.service.index.cache.size' shouldn't " +
"exceed 8g bytes due to Guava#1761");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.maximumWeight(indexCacheSizeBytes)
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,20 @@ public RemoteBlockPushResolver(TransportConf conf) {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
long mergedIndexCacheSize = conf.mergedIndexCacheSize();
// DEFAULT_CONCURRENCY_LEVEL is 4 and if mergedIndexCacheSize > 8g bytes(8589934592L),
// maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761.
Preconditions.checkArgument(mergedIndexCacheSize <= 8589934592L,
"The value of 'spark.shuffle.push.server.mergedIndexCacheSize' shouldn't " +
"exceed 8g bytes due to Guava#1761");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
.maximumWeight(mergedIndexCacheSize)
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SizeEstimator


Expand Down Expand Up @@ -107,6 +108,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
// than the size of one [[FileStatus]]).
// so it will support objects up to 64GB in size.
val weightScale = 32
val maximumWeight = maxSizeInBytes / weightScale
// DEFAULT_CONCURRENCY_LEVEL is 4, weightScale is 32,
// if maxSizeInBytes > 256g bytes(274877906944L),
// maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761.
assert(maximumWeight <= 274877906944L,
s"The value of '${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' shouldn't " +
"exceed 256g bytes due to Guava#1761")
val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
Expand Down Expand Up @@ -136,7 +144,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
var builder = CacheBuilder.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
.maximumWeight(maximumWeight)

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
Expand Down