diff --git a/FORK.md b/FORK.md index d89b8b683288a..2c2809b43bcdc 100644 --- a/FORK.md +++ b/FORK.md @@ -1,4 +1,5 @@ # Difference with upstream +* [SPARK-18079](https://issues.apache.org/jira/browse/SPARK-18079) - CollectLimitExec.executeToIterator should perform per-partition limits * [SPARK-20952](https://issues.apache.org/jira/browse/SPARK-20952) - ParquetFileFormat should forward TaskContext to its forkjoinpool * [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases * [SPARK-25200](https://issues.apache.org/jira/browse/SPARK-25200) - Allow setting HADOOP_CONF_DIR as a spark config diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index ddbd0a343ffcf..e3869c446dfd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -45,6 +45,12 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + + // TODO(palantir): Reopen upstream PR for SPARK-18079; we need this for an internal product (DP) + override def executeToIterator(): Iterator[InternalRow] = { + LocalLimitExec(limit, child).executeToIterator().take(limit) + } + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)