Skip to content

Commit

Permalink
[SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is a followup of apache/spark#43623 to fix a regression. For `With` inside conditional branches, they may not be evaluated at all and we should not pull out the common expressions into a `Project`, but just inline.

### Why are the changes needed?

avoid perf regression

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43978 from cloud-fan/with.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
a0x8o committed Nov 28, 2023
1 parent c28593b commit 4817e7d
Show file tree
Hide file tree
Showing 38 changed files with 1,006 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [encode(g#0, UTF-8) AS encode(g, UTF-8)#0]
Project [encode(g#0, UTF-8, false) AS encode(g, UTF-8)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [encode(g#0, UTF-8) AS to_binary(g, utf-8)#0]
Project [encode(g#0, UTF-8, false) AS to_binary(g, utf-8)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
Expand Up @@ -2563,6 +2563,8 @@ class SparkConnectPlanner(
// To avoid explicit handling of the result on the client, we build the expected input
// of the relation on the server. The client has to simply forward the result.
val result = SqlCommandResult.newBuilder()
// Only filled when isCommand
val metrics = ExecutePlanResponse.Metrics.newBuilder()
if (isCommand) {
// Convert the results to Arrow.
val schema = df.schema
Expand Down Expand Up @@ -2596,10 +2598,10 @@ class SparkConnectPlanner(
proto.LocalRelation
.newBuilder()
.setData(ByteString.copyFrom(bytes))))
metrics.addAllMetrics(MetricGenerator.transformPlan(df).asJava)
} else {
// Trigger assertExecutedPlanPrepared to ensure post ReadyForExecution before finished
// executedPlan is currently called by createMetricsResponse below
df.queryExecution.assertExecutedPlanPrepared()
// No execution triggered for relations. Manually set ready
tracker.setReadyForExecution()
result.setRelation(
proto.Relation
.newBuilder()
Expand All @@ -2622,8 +2624,17 @@ class SparkConnectPlanner(
.setSqlCommandResult(result)
.build())

// Send Metrics
responseObserver.onNext(MetricGenerator.createMetricsResponse(sessionHolder, df))
// Send Metrics when isCommand (i.e. show tables) which is eagerly executed & has metrics
// Skip metrics when !isCommand (i.e. select 1) which is not executed & doesn't have metrics
if (isCommand) {
responseObserver.onNext(
ExecutePlanResponse
.newBuilder()
.setSessionId(sessionHolder.sessionId)
.setServerSideSessionId(sessionHolder.serverSessionId)
.setMetrics(metrics.build)
.build)
}
}

private def handleRegisterUserDefinedFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ private[connect] object MetricGenerator extends AdaptiveSparkPlanHelper {
allChildren(p).flatMap(c => transformPlan(c, p.id))
}

private[connect] def transformPlan(
rows: DataFrame): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
val executedPlan = rows.queryExecution.executedPlan
transformPlan(executedPlan, executedPlan.id)
}

private def transformPlan(
p: SparkPlan,
parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
Expand Down
7 changes: 1 addition & 6 deletions dev/create-release/spark-rm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ ENV DEBCONF_NONINTERACTIVE_SEEN true
# These arguments are just for reuse and not really meant to be customized.
ARG APT_INSTALL="apt-get install --no-install-recommends -y"

# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
# We should use the latest Sphinx version once this is fixed.
# TODO(SPARK-35375): Jinja2 3.0.0+ causes error when building with Sphinx.
# See also https://issues.apache.org/jira/browse/SPARK-35375.
ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.8.0 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==2.11.3 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.59.3 protobuf==4.21.6 grpcio-status==1.59.3 googleapis-common-protos==1.56.4"
ARG PIP_PKGS="sphinx==4.2.0 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.59.3 protobuf==4.21.6 grpcio-status==1.59.3 googleapis-common-protos==1.56.4"
ARG GEM_PKGS="bundler:2.3.8"

# Install extra needed repos and refresh.
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ commons-crypto/1.1.0//commons-crypto-1.1.0.jar
commons-dbcp/1.4//commons-dbcp-1.4.jar
commons-io/2.15.0//commons-io-2.15.0.jar
commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-lang3/3.14.0//commons-lang3-3.14.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
commons-math3/3.6.1//commons-math3-3.6.1.jar
commons-pool/1.5.4//commons-pool-1.5.4.jar
Expand Down
1 change: 1 addition & 0 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,5 @@ RUN python3.12 -m pip install numpy 'pyarrow>=14.0.0' 'six==1.16.0' 'pandas<=2.1
RUN python3.12 -m pip install 'grpcio==1.59.3' 'grpcio-status==1.59.3' 'protobuf==4.25.1' 'googleapis-common-protos==1.56.4'
# TODO(SPARK-46078) Use official one instead of nightly build when it's ready
RUN python3.12 -m pip install --pre torch --index-url https://download.pytorch.org/whl/nightly/cpu
RUN python3.12 -m pip install torchvision --index-url https://download.pytorch.org/whl/cpu
RUN python3.12 -m pip install torcheval
7 changes: 0 additions & 7 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ Note: If you are on a system with both Ruby 1.9 and Ruby 2.0 you may need to rep

To generate SQL and Python API docs, you'll need to install these libraries:

<!--
TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
See also https://github.com/sphinx-doc/sphinx/issues/7551.
TODO(SPARK-35375): Jinja2 3.0.0+ causes error when building with Sphinx.
See also https://issues.apache.org/jira/browse/SPARK-35375.
-->
Run the following command from $SPARK_HOME:
```sh
$ pip install --upgrade -r dev/requirements.txt
Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ license: |
- Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions.
- Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of codec name `lz4raw`, please use `lz4_raw` instead.
- Since Spark 4.0, when overflowing during casting timestamp to byte/short/int under non-ansi mode, Spark will return null instead a wrapping value.
- Since Spark 4.0, the `encode()` function supports only the following charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'. To restore the previous behavior when the function accepts charsets of the current JDK used by Spark, set `spark.sql.legacy.javaCharsets` to `true`.

## Upgrading from Spark SQL 3.4 to 3.5

Expand Down
8 changes: 8 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,14 @@ Specifically for built-in HDFS state store provider, users can check the state s
it is best if cache missing count is minimized that means Spark won't waste too much time on loading checkpointed state.
User can increase Spark locality waiting configurations to avoid loading state store providers in different executors across batches.

#### State Data Source (Experimental)

Apache Spark provides a streaming state related data source that provides the ability to manipulate state stores in the checkpoint. Users can run the batch query with State Data Source to get the visibility of the states for existing streaming query.

As of Spark 4.0, the data source only supports read feature. See [State Data Source Integration Guide](structured-streaming-state-data-source.html) for more details.

NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change.

## Starting Streaming Queries
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter`
([Python](api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter)/[Scala](api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html) docs)
Expand Down
248 changes: 248 additions & 0 deletions docs/structured-streaming-state-data-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
---
layout: global
displayTitle: State Data Source Integration Guide
title: State Data Source Integration Guide
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

State data source Guide in Structured Streaming (Experimental)

## Overview

State data source provides functionality to manipulate the state from the checkpoint.

As of Spark 4.0, state data source provides the read functionality with a batch query. Additional functionalities including write is on the future roadmap.

NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change.

## Reading state key-values from the checkpoint

State data source enables reading key-value pairs from the state store in the checkpoint, via running a separate batch query.
Users can leverage the functionality to cover two major use cases described below:

* Construct a test checking both output and the state. It is non-trivial to deduce the key-value of the state from the output, and having visibility of the state would be a huge win on testing.
* Investigate an incident against stateful streaming query. If users observe the incorrect output and want to track how it came up, having visibility of the state would be required.

Users can read an instance of state store, which is matched to a single stateful operator in most cases. This means, users can expect that they can read the entire key-value pairs in the state for a single stateful operator.

Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and
provides a user-friendly approach to read the state. See the section for stream-stream join for more details.

### Creating a State store for Batch Queries (all defaults)

<div class="codetabs">

<div data-lang="python" markdown="1">
{% highlight python %}

df = spark \
.read \
.format("statestore") \
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val df = spark
.read
.format("statestore")
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

Dataset<Row> df = spark
.read()
.format("statestore")
.load("<checkpointLocation>");

{% endhighlight %}
</div>

</div>

Each row in the source has the following schema:

<table class="table table-striped">
<thead><tr><th>Column</th><th>Type</th><th>Note</th></tr></thead>
<tr>
<td>key</td>
<td>struct (depends on the type for state key)</td>
<td></td>
</tr>
<tr>
<td>value</td>
<td>struct (depends on the type for state value)</td>
<td></td>
</tr>
<tr>
<td>_partition_id</td>
<td>int</td>
<td>metadata column (hidden unless specified with SELECT)</td>
</tr>
</table>

The nested columns for key and value heavily depend on the input schema of the stateful operator as well as the type of operator.
Users are encouraged to query about the schema via df.schema() / df.printSchema() first to understand the type of output.

The following options must be set for the source.

<table class="table table-striped">
<thead><tr><th>Option</th><th>value</th><th>meaning</th></tr></thead>
<tr>
<td>path</td>
<td>string</td>
<td>Specify the root directory of the checkpoint location. You can either specify the path via option("path", `path`) or load(`path`).</td>
</tr>
</table>

The following configurations are optional:

<table class="table table-striped">
<thead><tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr></thead>
<tr>
<td>batchId</td>
<td>numeric value</td>
<td>latest committed batch</td>
<td>Represents the target batch to read from. This option is used when users want to perform time-travel. The batch should be committed but not yet cleaned up.</td>
</tr>
<tr>
<td>operatorId</td>
<td>numeric value</td>
<td>0</td>
<td>Represents the target operator to read from. This option is used when the query is using multiple stateful operators.</td>
</tr>
<tr>
<td>storeName</td>
<td>string</td>
<td>DEFAULT</td>
<td>Represents the target state store name to read from. This option is used when the stateful operator uses multiple state store instances. It is not required except stream-stream join.</td>
</tr>
<tr>
<td>joinSide</td>
<td>string ("left" or "right")</td>
<td>(none)</td>
<td>Represents the target side to read from. This option is used when users want to read the state from stream-stream join.</td>
</tr>
</table>

### Reading state for Stream-stream join

Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally.
These instances logically compose buffers to store the input rows for left and right.

Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join.
To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together.

## State metadata source

Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.

Structured Streaming provides a data source named "State metadata source" to provide the state-related metadata information from the checkpoint.

Note: The metadata is constructed when the streaming query is running with Spark 4.0+. The existing checkpoint which has been running with lower Spark version does not have the metadata and will be unable to query/use with this metadata source. It is required to run the streaming query pointing the existing checkpoint in Spark 4.0+ to construct the metadata before querying.

### Creating a State metadata store for Batch Queries

<div class="codetabs">

<div data-lang="python" markdown="1">
{% highlight python %}

df = spark \
.read \
.format("state-metadata") \
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val df = spark
.read
.format("state-metadata")
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

Dataset<Row> df = spark
.read()
.format("state-metadata")
.load("<checkpointLocation>");

{% endhighlight %}
</div>

</div>

Each row in the source has the following schema:

<table class="table table-striped">
<thead><tr><th>Column</th><th>Type</th><th>Note</th></tr></thead>
<tr>
<td>operatorId</td>
<td>int</td>
<td></td>
</tr>
<tr>
<td>operatorName</td>
<td>string</td>
<td></td>
</tr>
<tr>
<td>stateStoreName</td>
<td>int</td>
<td></td>
</tr>
<tr>
<td>numPartitions</td>
<td>int</td>
<td></td>
</tr>
<tr>
<td>minBatchId</td>
<td>int</td>
<td>The minimum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as cleanup would run.</td>
</tr>
<tr>
<td>maxBatchId</td>
<td>int</td>
<td>The maximum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as the query will commit further batches.</td>
</tr>
<tr>
<td>_numColsPrefixKey</td>
<td>int</td>
<td>metadata column (hidden unless specified with SELECT)</td>
</tr>
</table>

One of the major use cases of this data source is to identify the operatorId to query if the query has multiple stateful operators, e.g. stream-stream join followed by deduplication.
The column 'operatorName' helps users to identify the operatorId for given operator.

Additionally, if users want to query about an internal state store instance for a stateful operator (e.g. stream-stream join), the column 'stateStoreName' would be useful to determine the target.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@
<!-- org.apache.commons/commons-lang/-->
<commons-lang2.version>2.6</commons-lang2.version>
<!-- org.apache.commons/commons-lang3/-->
<commons-lang3.version>3.13.0</commons-lang3.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<!-- org.apache.commons/commons-pool2/-->
<commons-pool2.version>2.11.1</commons-pool2.version>
<datanucleus-core.version>4.1.17</datanucleus-core.version>
Expand Down
3 changes: 3 additions & 0 deletions python/docs/source/_templates/spark_footer.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<p class="copyright">
{{copyright}} The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p>
Loading

0 comments on commit 4817e7d

Please sign in to comment.