From 98458fd364927627166a376ee1a145dcad1f2693 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 26 Aug 2021 15:59:24 +0800 Subject: [PATCH 1/5] add Guava cache bad case --- .../shuffle/ExternalShuffleBlockResolver.java | 3 +- .../ExternalShuffleBlockResolverSuite.java | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index bf8c6ae0ab31a..3189eca2a28f2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -80,7 +80,8 @@ public class ExternalShuffleBlockResolver { * Caches index file information so that we can avoid open/close the index files * for each block fetch. */ - private final LoadingCache shuffleIndexCache; + @VisibleForTesting + final LoadingCache shuffleIndexCache; // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 04d4bdf92bae7..f7bdd2458b622 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -17,20 +17,28 @@ package org.apache.spark.network.shuffle; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.cache.LoadingCache; import com.google.common.io.CharStreams; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.*; @@ -110,6 +118,48 @@ public void testSortShuffleBlocks() throws IOException { } } + @Test + public void testShuffleIndexCacheEvictionBehavior() throws IOException, ExecutionException { + Map config = new HashMap<>(); + String indexCacheSize = "8192m"; + config.put("spark.shuffle.service.index.cache.size", indexCacheSize); + TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(config)); + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(transportConf, null); + resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); + + LoadingCache shuffleIndexCache = resolver.shuffleIndexCache; + + // 8g -> 8589934592 bytes + long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize); + int unitSize = 1048575; + // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL + int concurrencyLevel = 4; + int totalGetCount = 16384; + // maxCacheCount is 8192 + long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * concurrencyLevel; + for (int i = 0; i < totalGetCount; i++) { + File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index"); + ShuffleIndexInformation indexInfo = Mockito.mock(ShuffleIndexInformation.class); + Mockito.when(indexInfo.getSize()).thenReturn(unitSize); + shuffleIndexCache.get(indexFile, () -> indexInfo); + } + + long totalWeight = + shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum(); + long size = shuffleIndexCache.size(); + try{ + Assert.assertTrue(size <= maxCacheCount); + Assert.assertTrue(totalWeight < maximumWeight); + fail("The tests code should not enter this line now."); + } catch (AssertionError error) { + // The code will enter this branch because LocalCache weight eviction does not work + // when maxSegmentWeight is >= Int.MAX_VALUE. + // TODO remove cache AssertionError after fix this bug. + Assert.assertTrue(size > maxCacheCount && size <= totalGetCount); + Assert.assertTrue(totalWeight > maximumWeight); + } + } + @Test public void jsonSerializationOfExecutorRegistration() throws IOException { ObjectMapper mapper = new ObjectMapper(); From 8ec752568f6ce075bf3eae9a5b856ddea44708b5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 26 Aug 2021 21:23:11 +0800 Subject: [PATCH 2/5] try to upgrade guava version --- .../shuffle/ExternalShuffleBlockResolverSuite.java | 13 ++----------- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 7 ++++++- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 7 ++++++- pom.xml | 2 +- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index f7bdd2458b622..0885f66dccf03 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -147,17 +147,8 @@ public void testShuffleIndexCacheEvictionBehavior() throws IOException, Executio long totalWeight = shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum(); long size = shuffleIndexCache.size(); - try{ - Assert.assertTrue(size <= maxCacheCount); - Assert.assertTrue(totalWeight < maximumWeight); - fail("The tests code should not enter this line now."); - } catch (AssertionError error) { - // The code will enter this branch because LocalCache weight eviction does not work - // when maxSegmentWeight is >= Int.MAX_VALUE. - // TODO remove cache AssertionError after fix this bug. - Assert.assertTrue(size > maxCacheCount && size <= totalGetCount); - Assert.assertTrue(totalWeight > maximumWeight); - } + Assert.assertTrue(size <= maxCacheCount); + Assert.assertTrue(totalWeight < maximumWeight); } @Test diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 10eaa5cf6c5e4..e2e29062651bf 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -31,6 +31,7 @@ bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar +checker-qual/3.8.0//checker-qual-3.8.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar @@ -62,10 +63,12 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar +error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar +failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar -guava/14.0.1//guava-14.0.1.jar +guava/30.1.1-jre//guava-30.1.1-jre.jar guice-servlet/3.0//guice-servlet-3.0.jar guice/3.0//guice-3.0.jar hadoop-annotations/2.7.4//hadoop-annotations-2.7.4.jar @@ -106,6 +109,7 @@ httpclient/4.5.13//httpclient-4.5.13.jar httpcore/4.4.14//httpcore-4.4.14.jar istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar ivy/2.5.0//ivy-2.5.0.jar +j2objc-annotations/1.3//j2objc-annotations-1.3.jar jackson-annotations/2.12.3//jackson-annotations-2.12.3.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.12.3//jackson-core-2.12.3.jar @@ -179,6 +183,7 @@ lapack/2.2.0//lapack-2.2.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar +listenablefuture/9999.0-empty-to-avoid-conflict-with-guava//listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar log4j/1.2.17//log4j-1.2.17.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 2f9e709f2b60a..01114b5a3af81 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -26,6 +26,7 @@ bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar +checker-qual/3.8.0//checker-qual-3.8.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-cli/1.2//commons-cli-1.2.jar @@ -53,10 +54,12 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar +error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar +failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar -guava/14.0.1//guava-14.0.1.jar +guava/30.1.1-jre//guava-30.1.1-jre.jar hadoop-client-api/3.3.1//hadoop-client-api-3.3.1.jar hadoop-client-runtime/3.3.1//hadoop-client-runtime-3.3.1.jar hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar @@ -84,6 +87,7 @@ httpclient/4.5.13//httpclient-4.5.13.jar httpcore/4.4.14//httpcore-4.4.14.jar istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar ivy/2.5.0//ivy-2.5.0.jar +j2objc-annotations/1.3//j2objc-annotations-1.3.jar jackson-annotations/2.12.3//jackson-annotations-2.12.3.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.12.3//jackson-core-2.12.3.jar @@ -150,6 +154,7 @@ lapack/2.2.0//lapack-2.2.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar +listenablefuture/9999.0-empty-to-avoid-conflict-with-guava//listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar log4j/1.2.17//log4j-1.2.17.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar diff --git a/pom.xml b/pom.xml index 214a5e87b95fc..836a4c0d55fd4 100644 --- a/pom.xml +++ b/pom.xml @@ -181,7 +181,7 @@ 2.6.2 4.1.17 - 14.0.1 + 30.1.1-jre 3.0.16 2.34 2.10.10 From 6c4b06d3359be03cda877296dd0f169df025c87d Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 26 Aug 2021 22:50:25 +0800 Subject: [PATCH 3/5] Revert "try to upgrade guava version" This reverts commit 8ec752568f6ce075bf3eae9a5b856ddea44708b5. --- .../shuffle/ExternalShuffleBlockResolverSuite.java | 13 +++++++++++-- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 7 +------ dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 7 +------ pom.xml | 2 +- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 0885f66dccf03..f7bdd2458b622 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -147,8 +147,17 @@ public void testShuffleIndexCacheEvictionBehavior() throws IOException, Executio long totalWeight = shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum(); long size = shuffleIndexCache.size(); - Assert.assertTrue(size <= maxCacheCount); - Assert.assertTrue(totalWeight < maximumWeight); + try{ + Assert.assertTrue(size <= maxCacheCount); + Assert.assertTrue(totalWeight < maximumWeight); + fail("The tests code should not enter this line now."); + } catch (AssertionError error) { + // The code will enter this branch because LocalCache weight eviction does not work + // when maxSegmentWeight is >= Int.MAX_VALUE. + // TODO remove cache AssertionError after fix this bug. + Assert.assertTrue(size > maxCacheCount && size <= totalGetCount); + Assert.assertTrue(totalWeight > maximumWeight); + } } @Test diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index e2e29062651bf..10eaa5cf6c5e4 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -31,7 +31,6 @@ bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar -checker-qual/3.8.0//checker-qual-3.8.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar @@ -63,12 +62,10 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar -error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar -failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar -guava/30.1.1-jre//guava-30.1.1-jre.jar +guava/14.0.1//guava-14.0.1.jar guice-servlet/3.0//guice-servlet-3.0.jar guice/3.0//guice-3.0.jar hadoop-annotations/2.7.4//hadoop-annotations-2.7.4.jar @@ -109,7 +106,6 @@ httpclient/4.5.13//httpclient-4.5.13.jar httpcore/4.4.14//httpcore-4.4.14.jar istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar ivy/2.5.0//ivy-2.5.0.jar -j2objc-annotations/1.3//j2objc-annotations-1.3.jar jackson-annotations/2.12.3//jackson-annotations-2.12.3.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.12.3//jackson-core-2.12.3.jar @@ -183,7 +179,6 @@ lapack/2.2.0//lapack-2.2.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -listenablefuture/9999.0-empty-to-avoid-conflict-with-guava//listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar log4j/1.2.17//log4j-1.2.17.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 01114b5a3af81..2f9e709f2b60a 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -26,7 +26,6 @@ bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar -checker-qual/3.8.0//checker-qual-3.8.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-cli/1.2//commons-cli-1.2.jar @@ -54,12 +53,10 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar -error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar -failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar -guava/30.1.1-jre//guava-30.1.1-jre.jar +guava/14.0.1//guava-14.0.1.jar hadoop-client-api/3.3.1//hadoop-client-api-3.3.1.jar hadoop-client-runtime/3.3.1//hadoop-client-runtime-3.3.1.jar hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar @@ -87,7 +84,6 @@ httpclient/4.5.13//httpclient-4.5.13.jar httpcore/4.4.14//httpcore-4.4.14.jar istack-commons-runtime/3.0.8//istack-commons-runtime-3.0.8.jar ivy/2.5.0//ivy-2.5.0.jar -j2objc-annotations/1.3//j2objc-annotations-1.3.jar jackson-annotations/2.12.3//jackson-annotations-2.12.3.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.12.3//jackson-core-2.12.3.jar @@ -154,7 +150,6 @@ lapack/2.2.0//lapack-2.2.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -listenablefuture/9999.0-empty-to-avoid-conflict-with-guava//listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar log4j/1.2.17//log4j-1.2.17.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar diff --git a/pom.xml b/pom.xml index 836a4c0d55fd4..214a5e87b95fc 100644 --- a/pom.xml +++ b/pom.xml @@ -181,7 +181,7 @@ 2.6.2 4.1.17 - 30.1.1-jre + 14.0.1 3.0.16 2.34 2.10.10 From 0fade2abb70ff13776275cc3eec4f4e5286bc5f0 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 31 Aug 2021 20:43:07 +0800 Subject: [PATCH 4/5] add value check --- .../shuffle/ExternalShuffleBlockResolver.java | 11 ++-- .../shuffle/RemoteBlockPushResolver.java | 7 ++- .../ExternalShuffleBlockResolverSuite.java | 50 ------------------- .../datasources/FileStatusCache.scala | 10 +++- 4 files changed, 23 insertions(+), 55 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 3189eca2a28f2..2684b1a919547 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.commons.lang3.tuple.Pair; @@ -80,8 +81,7 @@ public class ExternalShuffleBlockResolver { * Caches index file information so that we can avoid open/close the index files * for each block fetch. */ - @VisibleForTesting - final LoadingCache shuffleIndexCache; + private final LoadingCache shuffleIndexCache; // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; @@ -113,6 +113,11 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false")); this.registeredExecutorFile = registeredExecutorFile; String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); + long indexCacheSizeBytes = JavaUtils.byteStringAsBytes(indexCacheSize); + // DEFAULT_CONCURRENCY_LEVEL is 4 and if indexCacheSizeBytes > 8g bytes(8589934592L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + Preconditions.checkArgument(indexCacheSizeBytes <= 8589934592L, + "The value of 'spark.shuffle.service.index.cache.size' shouldn't exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -120,7 +125,7 @@ public ShuffleIndexInformation load(File file) throws IOException { } }; shuffleIndexCache = CacheBuilder.newBuilder() - .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) + .maximumWeight(indexCacheSizeBytes) .weigher((Weigher) (file, indexInfo) -> indexInfo.getSize()) .build(indexCacheLoader); db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index d0eb4aed65934..eccfb1f7450d3 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -122,6 +122,11 @@ public RemoteBlockPushResolver(TransportConf conf) { NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); + long mergedIndexCacheSize = conf.mergedIndexCacheSize(); + // DEFAULT_CONCURRENCY_LEVEL is 4 and if mergedIndexCacheSize > 8g bytes(8589934592L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + Preconditions.checkArgument(mergedIndexCacheSize <= 8589934592L, + "The value of 'spark.shuffle.push.server.mergedIndexCacheSize' shouldn't exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -129,7 +134,7 @@ public ShuffleIndexInformation load(File file) throws IOException { } }; indexCache = CacheBuilder.newBuilder() - .maximumWeight(conf.mergedIndexCacheSize()) + .maximumWeight(mergedIndexCacheSize) .weigher((Weigher)(file, indexInfo) -> indexInfo.getSize()) .build(indexCacheLoader); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index f7bdd2458b622..04d4bdf92bae7 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -17,28 +17,20 @@ package org.apache.spark.network.shuffle; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.LoadingCache; import com.google.common.io.CharStreams; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; import static org.junit.Assert.*; @@ -118,48 +110,6 @@ public void testSortShuffleBlocks() throws IOException { } } - @Test - public void testShuffleIndexCacheEvictionBehavior() throws IOException, ExecutionException { - Map config = new HashMap<>(); - String indexCacheSize = "8192m"; - config.put("spark.shuffle.service.index.cache.size", indexCacheSize); - TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(config)); - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(transportConf, null); - resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); - - LoadingCache shuffleIndexCache = resolver.shuffleIndexCache; - - // 8g -> 8589934592 bytes - long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize); - int unitSize = 1048575; - // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL - int concurrencyLevel = 4; - int totalGetCount = 16384; - // maxCacheCount is 8192 - long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * concurrencyLevel; - for (int i = 0; i < totalGetCount; i++) { - File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index"); - ShuffleIndexInformation indexInfo = Mockito.mock(ShuffleIndexInformation.class); - Mockito.when(indexInfo.getSize()).thenReturn(unitSize); - shuffleIndexCache.get(indexFile, () -> indexInfo); - } - - long totalWeight = - shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum(); - long size = shuffleIndexCache.size(); - try{ - Assert.assertTrue(size <= maxCacheCount); - Assert.assertTrue(totalWeight < maximumWeight); - fail("The tests code should not enter this line now."); - } catch (AssertionError error) { - // The code will enter this branch because LocalCache weight eviction does not work - // when maxSegmentWeight is >= Int.MAX_VALUE. - // TODO remove cache AssertionError after fix this bug. - Assert.assertTrue(size > maxCacheCount && size <= totalGetCount); - Assert.assertTrue(totalWeight > maximumWeight); - } - } - @Test public void jsonSerializationOfExecutorRegistration() throws IOException { ObjectMapper mapper = new ObjectMapper(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index b5d800f02862e..441f0b87b5d0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SizeEstimator @@ -107,6 +108,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends // than the size of one [[FileStatus]]). // so it will support objects up to 64GB in size. val weightScale = 32 + val maximumWeight = maxSizeInBytes / weightScale + // DEFAULT_CONCURRENCY_LEVEL is 4, weightScale is 32, + // if maxSizeInBytes > 256g bytes(274877906944L), + // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. + assert(maximumWeight <= 274877906944L, + s"The value of '${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' shouldn't " + + "exceed 256g bytes due to Guava#1761") val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale @@ -136,7 +144,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) - .maximumWeight(maxSizeInBytes / weightScale) + .maximumWeight(maximumWeight) if (cacheTTL > 0) { builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) From 3b965c2248d35ff2c3c12fb81124edb8f9e303ba Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 3 Sep 2021 11:04:12 +0800 Subject: [PATCH 5/5] fix Line is longer than 100 characters --- .../spark/network/shuffle/ExternalShuffleBlockResolver.java | 3 ++- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 2684b1a919547..9f600e077e15d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -117,7 +117,8 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF // DEFAULT_CONCURRENCY_LEVEL is 4 and if indexCacheSizeBytes > 8g bytes(8589934592L), // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. Preconditions.checkArgument(indexCacheSizeBytes <= 8589934592L, - "The value of 'spark.shuffle.service.index.cache.size' shouldn't exceed 8g bytes due to Guava#1761"); + "The value of 'spark.shuffle.service.index.cache.size' shouldn't " + + "exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index eccfb1f7450d3..6bea818acf8ee 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -126,7 +126,8 @@ public RemoteBlockPushResolver(TransportConf conf) { // DEFAULT_CONCURRENCY_LEVEL is 4 and if mergedIndexCacheSize > 8g bytes(8589934592L), // maxSegmentWeight will more than 2g, the weight eviction will not work due to Guava#1761. Preconditions.checkArgument(mergedIndexCacheSize <= 8589934592L, - "The value of 'spark.shuffle.push.server.mergedIndexCacheSize' shouldn't exceed 8g bytes due to Guava#1761"); + "The value of 'spark.shuffle.push.server.mergedIndexCacheSize' shouldn't " + + "exceed 8g bytes due to Guava#1761"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException {