Skip to content

Commit 7b7c85e

Browse files
committed
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
## What changes were proposed in this pull request? Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes apache#18216 from srowen/SPARK-20920.
1 parent 278ba7a commit 7b7c85e

File tree

1 file changed

+13
-8
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/command

1 file changed

+13
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.util.Locale
2121

2222
import scala.collection.{GenMap, GenSeq}
2323
import scala.collection.parallel.ForkJoinTaskSupport
24-
import scala.concurrent.forkjoin.ForkJoinPool
2524
import scala.util.control.NonFatal
2625

2726
import org.apache.hadoop.conf.Configuration
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3635
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3736
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3837
import org.apache.spark.sql.types._
39-
import org.apache.spark.util.SerializableConfiguration
38+
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
4039

4140
// Note: The definition of these commands are based on the ones described in
4241
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -588,8 +587,15 @@ case class AlterTableRecoverPartitionsCommand(
588587
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
589588
val hadoopConf = spark.sparkContext.hadoopConfiguration
590589
val pathFilter = getPathFilter(hadoopConf)
591-
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
592-
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
590+
591+
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
592+
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
593+
try {
594+
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
595+
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
596+
} finally {
597+
evalPool.shutdown()
598+
}
593599
val total = partitionSpecsAndLocs.length
594600
logInfo(s"Found $total partitions in $root")
595601

@@ -610,8 +616,6 @@ case class AlterTableRecoverPartitionsCommand(
610616
Seq.empty[Row]
611617
}
612618

613-
@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
614-
615619
private def scanPartitions(
616620
spark: SparkSession,
617621
fs: FileSystem,
@@ -620,7 +624,8 @@ case class AlterTableRecoverPartitionsCommand(
620624
spec: TablePartitionSpec,
621625
partitionNames: Seq[String],
622626
threshold: Int,
623-
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
627+
resolver: Resolver,
628+
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
624629
if (partitionNames.isEmpty) {
625630
return Seq(spec -> path)
626631
}
@@ -644,7 +649,7 @@ case class AlterTableRecoverPartitionsCommand(
644649
val value = ExternalCatalogUtils.unescapePathName(ps(1))
645650
if (resolver(columnName, partitionNames.head)) {
646651
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
647-
partitionNames.drop(1), threshold, resolver)
652+
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
648653
} else {
649654
logWarning(
650655
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")

0 commit comments

Comments
 (0)