Skip to content

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Aug 26, 2021

What changes were proposed in this pull request?

The Guava Cache use in Spark with weight eviction mechanism has risk of memory leak because LocalCache weight eviction does not work when maxSegmentWeight is >= Int.MAX_VALUE Guava#1761

The main change of this pr is add the maximumWeight value limit for Guava cache instances that use the weight eviction strategy, include:

  1. spark.shuffle.service.index.cache.size should <= 8g bytes
  2. spark.shuffle.push.server.mergedIndexCacheSize should <= 8g bytes
  3. spark.sql.hive.filesourcePartitionFileCacheSize should <= 256g

Why are the changes needed?

Avoiding the weight eviction bug of Guava Cache(Guava#1761)

Before this pr, the following test will fail, some cache entries was not evicted as expected and memory leak will occur

  @Test
  public void testShuffleIndexCacheEvictionBehavior() throws IOException, ExecutionException {
    Map<String, String> config = new HashMap<>();
    String indexCacheSize = "8192m";
    config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
    TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(config));
    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(transportConf, null);
    resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));

    // need change access scope of shuffleIndexCache
    LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = resolver.shuffleIndexCache;

    // 8g -> 8589934592 bytes
    long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
    int unitSize = 1048575;
    // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
    int concurrencyLevel = 4;
    int totalGetCount = 16384;
    // maxCacheCount is 8192
    long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * concurrencyLevel;
    for (int i = 0; i < totalGetCount; i++) {
      File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
      ShuffleIndexInformation indexInfo = Mockito.mock(ShuffleIndexInformation.class);
      Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
      shuffleIndexCache.get(indexFile, () -> indexInfo);
    }

    long totalWeight =
      shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
    long size = shuffleIndexCache.size();
    // Both of the following assertions fail
    Assert.assertTrue(size <= maxCacheCount);
    Assert.assertTrue(totalWeight < maximumWeight);
  }

And through the debug view, there are 2 segments.totalWeight overflowed.

LocalCache

After this pr, the above issue are avoided through the maximum limit because maximumWeight can no longer be greater than 2g.

Does this PR introduce any user-facing change?

The following 3 configs add the maximum limit:

  1. spark.shuffle.service.index.cache.size should <= 8g bytes
  2. spark.shuffle.push.server.mergedIndexCacheSize should <= 8g bytes
  3. spark.sql.hive.filesourcePartitionFileCacheSize should <= 256g

How was this patch tested?

Pass GA or Jenkins Tests.

@github-actions github-actions bot added the CORE label Aug 26, 2021
File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
ShuffleIndexInformation indexInfo = Mockito.mock(ShuffleIndexInformation.class);
Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
shuffleIndexCache.get(indexFile, () -> indexInfo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easy to mock the init behavior of ShuffleIndexInformation, so there re-write CacheLoader.

try{
Assert.assertTrue(size <= maxCacheCount);
Assert.assertTrue(totalWeight < maximumWeight);
fail("The tests code should not enter this line now.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should enter line 153, but it won't now

@LuciferYang LuciferYang marked this pull request as draft August 26, 2021 08:43
@LuciferYang
Copy link
Contributor Author

 from the debug view  we found that there are 2 segment.totalWeight is a negative value:

LocalCache

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 26, 2021

Maybe there the following ways can solve the problem:

  1. Upgrade the Guava version can fix this bug
  2. Or we can avoid this issue by limiting the configuration related to maximumWeight to the security range

I will try to upgrade Guava version first after 98458fd build successful

Do you have any better suggestions ? @HyukjinKwon @dongjoon-hyun @srowen

public void testShuffleIndexCacheEvictionBehavior() throws IOException, ExecutionException {
Map<String, String> config = new HashMap<>();
String indexCacheSize = "8192m";
config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configure spark.shuffle.service.index.cache.size with 8g and the default concurrencyLevel is 4, so the maxSegmentWeight Local Cache is Int.MAX_VALUE, these condition will trigger the bug

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47303/

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Test build #142802 has finished for PR 33848 at commit 98458fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang LuciferYang changed the title [WIP][SPARK-36598][SHUFFLE][SQL] Fix memory leak in cache use weight eviction mechanism [WIP][SPARK-36598][SHUFFLE][SQL] Fix memory leak when cache use weight eviction mechanism Aug 26, 2021
@github-actions github-actions bot added the BUILD label Aug 26, 2021
@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 26, 2021

8ec7525 upgrade Guava to 30.1.1-jre to fix LocalCache weight eviction does not work when maxSegmentWeight is >= Int.MAX_VALUE issue and change UT to check the eviction behavior

datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
derby/10.14.2.0//derby-10.14.2.0.jar
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
Copy link
Contributor Author

@LuciferYang LuciferYang Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @dongjoon-hyun @srowen It seems that upgrading the Guava version can solve the problem, but it will also add additional dependencies. I'm not sure whether it is valuable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new dep is fine, it's ALv2 https://github.com/google/error-prone
yes the problem is that it can potentially interfere with other libs like Hadoop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I agree with @srowen 's comment.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If upgradingGuava version is not a good way, it seems that it is not easy to fix this potential bug. I have manually verified that Caffeine does not have this problem, but maybe we are not willing to accept it because #33784

So can we prevent users from using a problematic config by adding some config value check?

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @dongjoon-hyun
As comments above:

  1. Upgrading the Guava version will have potential interfere with other libs like Hadoop
  2. Use Caffeine will introduce new dependent libs. (Revert "[SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache" #33784)

Therefore, I added a maximum limit to the related configurations to avoid the potential bug of Guava Cache Guava#1761

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47320/

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 26, 2021

Through #30022 , I found that upgrading guava has some impact on hadoop-2.x, so I will revet the change of 8ec7525 and waiting for others advice

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47321/

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Test build #142818 has finished for PR 33848 at commit 8ec7525.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2021

Test build #142819 has finished for PR 33848 at commit 6c4b06d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions github-actions bot added the SQL label Aug 31, 2021
@LuciferYang LuciferYang changed the title [WIP][SPARK-36598][SHUFFLE][SQL] Fix memory leak when cache use weight eviction mechanism [SPARK-36598][SHUFFLE][SQL] Avoid the potential memory leak of Guava cache by add maximumWeight limit Aug 31, 2021
@LuciferYang LuciferYang changed the title [SPARK-36598][SHUFFLE][SQL] Avoid the potential memory leak of Guava cache by add maximumWeight limit [SPARK-36598][SHUFFLE][SQL] Avoid the memory leak of Guava cache by add maximumWeight limit Aug 31, 2021
@LuciferYang LuciferYang marked this pull request as ready for review August 31, 2021 13:05
@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47391/

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47391/

@SparkQA
Copy link

SparkQA commented Aug 31, 2021

Test build #142888 has finished for PR 33848 at commit 0fade2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47456/

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47456/

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47458/

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Test build #142956 has finished for PR 33848 at commit 3b965c2.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47458/

@SparkQA
Copy link

SparkQA commented Sep 3, 2021

Test build #142960 has finished for PR 33848 at commit 5524c68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TimestampNTZType(AtomicType, metaclass=DataTypeSingleton):
  • abstract class AbstractPodsAllocator
  • class ExecutorPodsAllocator(
  • case class GenerateExecID(podName: String) extends Serializable
  • case class Arguments(
  • class StatefulsetPodsAllocator(
  • case class UnresolvedDBObjectName(nameParts: Seq[String], isNamespace: Boolean) extends LeafNode
  • case class ResolvedDBObjectName(catalog: CatalogPlugin, nameParts: Seq[String]) extends LeafNode
  • trait SupportsPushDownCatalystFilters
  • class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
  • class AvailableNowMicroBatchStreamWrapper(delegate: MicroBatchStream)
  • class AvailableNowSourceWrapper(delegate: Source)
  • case class SingleBatchExecutor() extends TriggerExecutor
  • case class MultiBatchExecutor() extends TriggerExecutor
  • class HiveThriftServer2AppStatusStore(store: KVStore)

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47759/

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47759/

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Test build #143256 has finished for PR 33848 at commit f200546.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class NettyLogger
  • public final class LZ4Compressor
  • public final class LZ4Factory
  • public final class LZ4SafeDecompressor
  • class DatetimeNTZOps(DatetimeOps):
  • class DatetimeNTZConverter(object):
  • public final class AlwaysFalse extends Filter
  • public final class AlwaysTrue extends Filter
  • public final class And extends BinaryFilter
  • abstract class BinaryComparison extends Filter
  • abstract class BinaryFilter extends Filter
  • public final class EqualNullSafe extends BinaryComparison
  • public final class EqualTo extends BinaryComparison
  • public abstract class Filter implements Expression
  • public final class GreaterThan extends BinaryComparison
  • public final class GreaterThanOrEqual extends BinaryComparison
  • public final class In extends Filter
  • public final class IsNotNull extends Filter
  • public final class IsNull extends Filter
  • public final class LessThan extends BinaryComparison
  • public final class LessThanOrEqual extends BinaryComparison
  • public final class Not extends Filter
  • public final class Or extends BinaryFilter
  • public final class StringContains extends StringPredicate
  • public final class StringEndsWith extends StringPredicate
  • abstract class StringPredicate extends Filter
  • public final class StringStartsWith extends StringPredicate
  • case class CastTimestampNTZToLong(child: Expression) extends TimestampToLongBase
  • case class ILike(
  • class SQLOpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
  • case class OptimizeSkewedJoin(
  • case class SkewJoinAwareCost(
  • case class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluator
  • case class EnsureRequirements(

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 24, 2021
@github-actions github-actions bot closed this Dec 25, 2021
@LuciferYang LuciferYang deleted the SPARK-36598 branch October 22, 2023 07:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants