From b9eea4994c3ad151aa75ed03bbcf807bc3c4ded8 Mon Sep 17 00:00:00 2001 From: Bo Meng Date: Mon, 25 Jun 2018 13:02:43 -0700 Subject: [PATCH 1/3] fix: SparkContext.binaryFiles ignore minPartitions parameter --- .../main/scala/org/apache/spark/input/PortableDataStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 17cdba4f1305..f0a71ed6a9e3 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -45,7 +45,8 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { - val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) + val defaultMaxSplitBytes = Math.min( + sc.getConf.get(config.FILES_MAX_PARTITION_BYTES), minPartitions) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala From 0fc35d4e0db34239cd3c52b0cf21445c59d2dede Mon Sep 17 00:00:00 2001 From: Bo Meng Date: Mon, 25 Jun 2018 13:04:58 -0700 Subject: [PATCH 2/3] should be max() --- .../main/scala/org/apache/spark/input/PortableDataStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index f0a71ed6a9e3..abcb5464aed7 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -45,7 +45,7 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { - val defaultMaxSplitBytes = Math.min( + val defaultMaxSplitBytes = Math.max( sc.getConf.get(config.FILES_MAX_PARTITION_BYTES), minPartitions) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism From c24fbe5cdf259814c30d9038fa3c35a2934ac39f Mon Sep 17 00:00:00 2001 From: Bo Meng Date: Tue, 26 Jun 2018 10:48:41 -0700 Subject: [PATCH 3/3] fix the issue --- .../scala/org/apache/spark/input/PortableDataStream.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index abcb5464aed7..ab020aaf6fa4 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -45,10 +45,9 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { - val defaultMaxSplitBytes = Math.max( - sc.getConf.get(config.FILES_MAX_PARTITION_BYTES), minPartitions) + val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) - val defaultParallelism = sc.defaultParallelism + val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions) val files = listStatus(context).asScala val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism