Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Sep 13, 2016

What changes were proposed in this pull request?

In PySpark, df.take(1) runs a single-stage job which computes only one partition of the DataFrame, while df.limit(1).collect() computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why limit(1).collect() is so much slower is that collect() internally maps to df.rdd.<some-pyspark-conversions>.toLocalIterator, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a take() job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that DataFrame.collect() also delegates to the Scala implementation and shares the same performance properties. This patch modifies DataFrame.collect() to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's CollectLimit optimizations.

How was this patch tested?

Added a regression test in sql/tests.py which asserts that the expected number of jobs, stages, and tasks are run for both queries.

private[sql] def collectToPython(): Int = {
EvaluatePython.registerPicklers()
withNewExecutionId {
PythonRDD.collectAndServe(javaToPython.rdd)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that collectAndServe was internally calling rdd.collect().iterator, so this patch's change doesn't have any impact on the maximum number of rows that need to be buffered in the JVM; this would not have been the case if the old code used collectAndServeIterator.

@JoshRosen
Copy link
Contributor Author

/cc @davies @rxin for review.

@SparkQA
Copy link

SparkQA commented Sep 13, 2016

Test build #65288 has finished for PR 15068 at commit 6fa9d92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2016

Test build #65290 has finished for PR 15068 at commit 07e4c8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Sep 13, 2016
## What changes were proposed in this pull request?

CollectLimit.execute() incorrectly omits per-partition limits, leading to performance regressions in case this case is hit (which should not happen in normal operation, but can occur in some cases (see #15068 for one example).

## How was this patch tested?

Regression test in SQLQuerySuite that asserts the number of records scanned from the input RDD.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15070 from JoshRosen/SPARK-17515.
asfgit pushed a commit that referenced this pull request Sep 13, 2016
CollectLimit.execute() incorrectly omits per-partition limits, leading to performance regressions in case this case is hit (which should not happen in normal operation, but can occur in some cases (see #15068 for one example).

Regression test in SQLQuerySuite that asserts the number of records scanned from the input RDD.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15070 from JoshRosen/SPARK-17515.

(cherry picked from commit 3f6a2bb)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@davies
Copy link
Contributor

davies commented Sep 14, 2016

@JoshRosen The current patch looks good to me, could you also fix the case that LocalLimit is inserted when we turn a DataFrame with limit into Python RDD?

@davies
Copy link
Contributor

davies commented Sep 14, 2016

@JoshRosen Just saw the other patch, LGTM

@davies
Copy link
Contributor

davies commented Sep 14, 2016

Merging this into master and 2.0.

asfgit pushed a commit that referenced this pull request Sep 14, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in #7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15068 from JoshRosen/pyspark-collect-limit.

(cherry picked from commit 6d06ff6)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
@asfgit asfgit closed this in 6d06ff6 Sep 14, 2016
@JoshRosen JoshRosen deleted the pyspark-collect-limit branch September 14, 2016 17:36
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
## What changes were proposed in this pull request?

CollectLimit.execute() incorrectly omits per-partition limits, leading to performance regressions in case this case is hit (which should not happen in normal operation, but can occur in some cases (see apache#15068 for one example).

## How was this patch tested?

Regression test in SQLQuerySuite that asserts the number of records scanned from the input RDD.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15070 from JoshRosen/SPARK-17515.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
… same in Python

## What changes were proposed in this pull request?

In PySpark, `df.take(1)` runs a single-stage job which computes only one partition of the DataFrame, while `df.limit(1).collect()` computes all partitions and runs a two-stage job. This difference in performance is confusing.

The reason why `limit(1).collect()` is so much slower is that `collect()` internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which causes Spark SQL to build a query where a global limit appears in the middle of the plan; this, in turn, ends up being executed inefficiently because limits in the middle of plans are now implemented by repartitioning to a single task rather than by running a `take()` job on the driver (this was done in apache#7334, a patch which was a prerequisite to allowing partition-local limits to be pushed beneath unions, etc.).

In order to fix this performance problem I think that we should generalize the fix from SPARK-10731 / apache#8876 so that `DataFrame.collect()` also delegates to the Scala implementation and shares the same performance properties. This patch modifies `DataFrame.collect()` to first collect all results to the driver and then pass them to Python, allowing this query to be planned using Spark's `CollectLimit` optimizations.

## How was this patch tested?

Added a regression test in `sql/tests.py` which asserts that the expected number of jobs, stages, and tasks are run for both queries.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15068 from JoshRosen/pyspark-collect-limit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants