Draft
Conversation
392d498 to
48521c3
Compare
### What changes were proposed in this pull request? Enabled flake8 [F811](https://www.flake8rules.com/rules/F811.html) check on our repo and fixed reported issues. ### Why are the changes needed? I know upgrading lint system is a pain, but we should not just put it aside forever. Our pinned `flake8` version is not even usable on Python3.12+. During this "lint fix", I actually discovered a few real bugs - most of them are silently disabled unittests because there is a test method that has the same name (probably due to copy/paste). I think this result supported the idea that we should take lint more seriously. About `functions.log`, we got it wrong. It's not because `overload` does not work properly - it's because we have two `log` function in that gigantic file. The former one is [dead](https://app.codecov.io/gh/apache/spark/blob/master/python%2Fpyspark%2Fsql%2Ffunctions%2Fbuiltin.py#L3111). I just removed that one. Again, I really think we should upgrade our lint system. I'm trying to do it slowly - piece by piece, so that people's daily workflow is not impacted too much. I hope we can eventually move to a place where all linters are updated and people can be more confident about their changes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `flake8` test on major directories. CI should give more a comprehensive result. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53253 from gaogaotiantian/flake8-f811. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
5a446a6 to
7cc4020
Compare
…h no BEGIN/END are used
### What changes were proposed in this pull request?
When Exception Handlers which don't have BEGIN-END body are triggered, internal exception `java.util.NoSuchElementException` was thrown instead of executing properly or propagating/raising the new error if it happens in handler.
```
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
SELECT 1;
SELECT 1/0;
END
```
### Why are the changes needed?
Code was encountering a bug which throws internal error for what should be valid user code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `SqlScriptingExecutionSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#53271 from miland-db/milan-dankovic_data/fix-no-body-handlers.
Authored-by: Milan Dankovic <milan.dankovic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rces ### What changes were proposed in this pull request? Introducing the OffsetMap format to key source progress by source name, as opposed to ordinal in the logical plan ### Why are the changes needed? These changes are needed in order to enable source evolution on a streaming query (adding, removing, reordering sources) without requiring the user to set a new checkpoint directory ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53123 from ericm-db/offset-map. Authored-by: ericm-db <eric.marnadi@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
…th Arrow file format ### What changes were proposed in this pull request? FPGrowth supports local filesystem ### Why are the changes needed? to make FPGrowth work with local filesystem ### Does this PR introduce _any_ user-facing change? yes, FPGrowth will work when local saving mode is one ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53232 from zhengruifeng/local_fs_fpg_with_file. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
7cc4020 to
5fa0407
Compare
…ation UDF
### What changes were proposed in this pull request?
This PR introduces an iterator API for Arrow grouped aggregation UDFs in PySpark. It adds support for two new UDF patterns:
- `Iterator[pa.Array] -> Any` for single column aggregations
- `Iterator[Tuple[pa.Array, ...]] -> Any` for multiple column aggregations
The implementation adds a new Python eval type `SQL_GROUPED_AGG_ARROW_ITER_UDF` with corresponding support in type inference, worker serialization, and Scala execution planning.
### Why are the changes needed?
The current Arrow grouped aggregation API requires loading all data for a group into memory at once, which can be problematic for groups with large amounts of data. The iterator API allows processing data in batches, providing:
1. **Memory Efficiency**: Processes data incrementally rather than loading entire group into memory
2. **Consistency**: Aligns with existing iterator APIs (e.g., `SQL_SCALAR_ARROW_ITER_UDF`)
3. **Flexibility**: Allows initialization of expensive state once per group while processing batches iteratively
### Does this PR introduce _any_ user-facing change?
Yes. This PR adds a new API pattern for Arrow grouped aggregation UDFs:
**Single column aggregation:**
```python
import pyarrow as pa
from typing import Iterator
from pyspark.sql.functions import arrow_udf
arrow_udf("double")
def arrow_mean(it: Iterator[pa.Array]) -> float:
sum_val = 0.0
cnt = 0
for v in it:
sum_val += pa.compute.sum(v).as_py()
cnt += len(v)
return sum_val / cnt if cnt > 0 else 0.0
df.groupby("id").agg(arrow_mean(df['v'])).show()
```
**Multiple column aggregation:**
```python
import pyarrow as pa
import numpy as np
from typing import Iterator, Tuple
from pyspark.sql.functions import arrow_udf
arrow_udf("double")
def arrow_weighted_mean(it: Iterator[Tuple[pa.Array, pa.Array]]) -> float:
weighted_sum = 0.0
weight = 0.0
for v, w in it:
weighted_sum += np.dot(v.to_numpy(), w.to_numpy())
weight += pa.compute.sum(w).as_py()
return weighted_sum / weight if weight > 0 else 0.0
df.groupby("id").agg(arrow_weighted_mean(df["v"], df["w"])).show()
```
### How was this patch tested?
Added comprehensive unit tests in `python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py`:
1. `test_iterator_grouped_agg_single_column()` - Tests single column iterator aggregation with `Iterator[pa.Array]`
2. `test_iterator_grouped_agg_multiple_columns()` - Tests multiple column iterator aggregation with `Iterator[Tuple[pa.Array, pa.Array]]`
3. `test_iterator_grouped_agg_eval_type()` - Verifies correct eval type inference from type hints
### Was this patch authored or co-authored using generative AI tooling?
Co-Generated-by: Cursor with Claude Sonnet 4.5
Closes apache#53035 from Yicong-Huang/SPARK-53615/feat/arrow-grouped-agg-iterator-api.
Authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
5fa0407 to
4a48212
Compare
…sed PySpark IPC by default ### What changes were proposed in this pull request? Enable PySpark Arrow-based optimizations by default in Spark 4.2.0, updating default conf values: - Set `spark.sql.execution.pythonUDF.arrow.enabled` and `spark.sql.execution.pythonUDTF.arrow.enabled` to`true` by default to enable Arrow-optimized execution for regular Python UDFs and UDTFs. - Set `spark.sql.execution.arrow.pyspark.enabled` to `true` by default to enable Arrow-based columnar data exchange for PySpark APIs such as DataFrame.toPandas and SparkSession.createDataFrame when the input is a pandas DataFrame or NumPy array. Update user-facing docs and migration guides to reflect the change. ### Why are the changes needed? Arrow’s columnar IPC significantly improves JVM↔Python throughput and reduces serialization/deserialization overhead, speeding up Python UDFs and DataFrame conversions. Additionally, Arrow provides consistent, well-defined rules for type coercion when Python return values differ from declared UDF return types, reducing ambiguous behavior. Enabling arrow by default brings performance and correctness improvements to the majority of PySpark users with minimal configuration. Users who depend on the previous (non-Arrow) implementation can opt out by explicitly setting `spark.sql.execution.pythonUDF.arrow.enabled`, `spark.sql.execution.pythonUTF.arrow.enabled`, and `spark.sql.execution.arrow.pyspark.enabled` to `false`. ### Does this PR introduce _any_ user-facing change? Yes, changes the default configuration of `spark.sql.execution.pythonUDF.arrow.enabled`, `spark.sql.execution.pythonUDTF.arrow.enabled`, and `spark.sql.execution.arrow.pyspark.enabled` to `true` and updates user-facing docs. ### How was this patch tested? Existing PySpark test suites are run with enabling and disabling the arrow conf. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53264 from asl3/enablearrowbydefault. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
…error conditions ### What changes were proposed in this pull request? This PR migrates three Hive-related legacy error codes to proper error conditions with the `INTERNAL_ERROR_` prefix: 1. **_LEGACY_ERROR_TEMP_2186** → **INTERNAL_ERROR_SERDE_INTERFACE_NOT_FOUND** 2. **_LEGACY_ERROR_TEMP_2187** → **INTERNAL_ERROR_INVALID_HIVE_COLUMN_TYPE** 3. **_LEGACY_ERROR_TEMP_2192** → **INTERNAL_ERROR_INVALID_PARTITION_FILTER_VALUE** ### Why are the changes needed? - Improves error message clarity and consistency - Removes legacy error codes as part of ongoing cleanup effort - Provides better user experience with more descriptive error messages - Follows Spark's error condition naming conventions ### Does this PR introduce _any_ user-facing change? Yes, users will see improved error messages for these three Hive-related error scenarios. ### How was this patch tested? - Existing tests continue to pass - Error message format follows Spark's error condition guidelines - All error parameters are properly defined in error-conditions.json ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.5 Closes apache#53244 from ganeshashree/SPARK-54534. Authored-by: Ganesha S <ganesha.s@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ma comparable ### What changes were proposed in this pull request? In this PR I propose to make `XmlOptions` and `XmlInferSchema` comparable. ### Why are the changes needed? In order to be able to compare them while working on the single-pass implementation (dual-runs). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53268 from mihailoale-db/xmlequalsimplement. Authored-by: mihailoale-db <mihailo.aleksic@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…orrect value ### What changes were proposed in this pull request? Set the metric executorSource.METRIC_RESULT_SIZE after the result size is populated. ### Why are the changes needed? Currently the metric executorSource.METRIC_RESULT_SIZE is set to the value of `task.metrics.resultSize` which is always 0 in that line. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested on local Spark. See the Jira attachments for metrics reported before and after fix. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53243 from jiwen624/executor-source-result-size. Authored-by: Eric Yang <jiwen624@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2047eec to
f6c7b6c
Compare
…n for now ### What changes were proposed in this pull request? Disable test_with_none_and_nan for now ### Why are the changes needed? to make CI green ### Does this PR introduce _any_ user-facing change? No, test-only ### How was this patch tested? manually test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53296 from zhengruifeng/skip_test_with_none_and_nan. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? Reenable FPGrowth on Connect ### Why are the changes needed? for feature parity ### Does this PR introduce _any_ user-facing change? yes, FPGrowth will be available on connect ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53294 from zhengruifeng/fpgrowth_model_size. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? Skip doctest to restore pyspark-pandas CI, while docs are being updated for arrow by default ### Why are the changes needed? Restore the CI ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53299 from asl3/skip-pyspark-pandas. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? Remove whitespace to restore docs build ### Why are the changes needed? Fix docs build in CI ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Locally run `make html` from python/docs directory ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53298 from asl3/docbuild. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
…ataframe from ndarray ### What changes were proposed in this pull request? Avoid unnecessary pandas conversion in create dataframe from ndarray ### Why are the changes needed? before: ndarray -> pandas dataframe -> arrow data after: ndarray -> arrow data and will be consistent with connect mode: https://github.com/apache/spark/blob/40ba971b7319d74670ba86cc1f280a8a0f7a1dbb/python/pyspark/sql/connect/session.py#L675-L706 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53280 from zhengruifeng/test_np_arrow. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…operties` from its own classloader ### What changes were proposed in this pull request? Change SparkBuildInfo to use its own classloader instead of thread context classloader to load `spark-version-info.properties`. ### Why are the changes needed? I hit an issue during the Connect JDBC driver & JetBrains DataGrip integration. ``` 2025-11-25 18:48:09,475 [ 55114] WARN - #c.i.d.d.BaseDatabaseErrorHandler$MissingDriverClassErrorInfo - Exception org.apache.spark.SparkException: Could not find spark-version-info.properties [in thread "RMI TCP Connection(3)-127.0.0.1"] java.lang.ExceptionInInitializerError: Exception org.apache.spark.SparkException: Could not find spark-version-info.properties [in thread "RMI TCP Connection(3)-127.0.0.1"] at org.apache.spark.SparkBuildInfo$.<clinit>(SparkBuildInfo.scala:35) at org.apache.spark.sql.connect.client.SparkConnectClient$.org$apache$spark$sql$connect$client$SparkConnectClient$$genUserAgent(SparkConnectClient.scala:978) at org.apache.spark.sql.connect.client.SparkConnectClient$Configuration$.apply$default$8(SparkConnectClient.scala:999) at org.apache.spark.sql.connect.client.SparkConnectClient$Builder.<init>(SparkConnectClient.scala:683) at org.apache.spark.sql.connect.client.SparkConnectClient$.builder(SparkConnectClient.scala:676) at org.apache.spark.sql.connect.client.jdbc.SparkConnectConnection.<init>(SparkConnectConnection.scala:31) at org.apache.spark.sql.connect.client.jdbc.NonRegisteringSparkConnectDriver.connect(NonRegisteringSparkConnectDriver.scala:36) at com.intellij.database.remote.jdbc.helpers.JdbcHelperImpl.connect(JdbcHelperImpl.java:786) at com.intellij.database.remote.jdbc.impl.RemoteDriverImpl.connect(RemoteDriverImpl.java:47) ``` After adding some debug messages, I found it was caused by using wrong classloader. ``` c.i.e.r.RemoteProcessSupport - ContextClassLoader: com.intellij.database.remote.jdbc.impl.JdbcClassLoader$1559cc356 c.i.e.r.RemoteProcessSupport - SparkBuildInfo ClassLoader: com.intellij.database.remote.jdbc.impl.JdbcClassLoader$JdbcClassLoaderImpl62e93ea8 ``` Similar issue that affects Hive JDBC driver and Spark's Isolated Classloader (see SPARK-32256) was fixed by [HADOOP-14067](https://issues.apache.org/jira/browse/HADOOP-14067) ### Does this PR introduce _any_ user-facing change? This fixes corner issues that the application uses multiple classloaders with Spark libs. ### How was this patch tested? Pass GHA to ensure the change breaks nothing, also manually verified the Connect JDBC driver & JetBrains DataGrip integration. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53279 from pan3793/SPARK-54565. Lead-authored-by: Cheng Pan <chengpan@apache.org> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
…erver output stream to files ### What changes were proposed in this pull request? Currently, the Spark Connect test server's stdout and stderr are discarded when SPARK_DEBUG_SC_JVM_CLIENT=false, making it difficult to debug test failures. This PR enables log4j logging for Test Spark Connect server in all test modes (both debug and non-debug) by always configuring log4j2.properties. ### Why are the changes needed? When `SPARK_DEBUG_SC_JVM_CLIENT=false` SparkConnectJdbcDataTypeSuite randomly hangs because the child server process blocks on write() calls when stdout/stderr pipe buffers fill up. Without consuming the output, the buffers reach capacity and cause the process to block indefinitely. Instead of `Redirect.DISCARD` , redirect the logs into log4j files ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested and confirmed that log files are created when 1) `SPARK_DEBUG_SC_JVM_CLIENT=false build/sbt "connect-client-jdbc/testOnly org.apache.spark.sql.connect.client.jdbc.SparkConnectJdbcDataTypeSuite"` OR 2) `SPARK_DEBUG_SC_JVM_CLIENT=true build/sbt "connect-client-jdbc/testOnly org.apache.spark.sql.connect.client.jdbc.SparkConnectJdbcDataTypeSuite"` ``` In this file ./target/unit-tests.log ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53275 from vinodkc/br_redirect_stdout_stderr_to_file. Authored-by: vinodkc <vinod.kc.in@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nt/wait cost ### What changes were proposed in this pull request? When ShuffleBlockFetcherIterator fetch data, two shuffle cost not calculated. 1. Network resource congestion and waiting between `fetchUpToMaxBytes` and `fetchAllHostLocalBlocks` ; 2. Connection establishment congestion. When `fetchUpToMaxBytes` and `fetchAllHostLocalBlocks` send request, create client may be congestion ### Why are the changes needed? Make shuffle fetch wait time request time more accurate. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? For open block request add a Thread.sleep(3000) latency, shuffle read metrics like below <img width="1724" height="829" alt="截屏2025-11-27 17 38 26" src="https://github.com/user-attachments/assets/99f3822d-d5a7-4f4a-abfc-cc272e61667c" /> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53245 from AngersZhuuuu/SPARK-54536. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? In this PR I propose to make `QueryPlanningTracker` as `HybridAnalyzer` field. ### Why are the changes needed? In order to simplify the code and further single-pass analyzer development. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53277 from mihailoale-db/analyzertracker. Authored-by: mihailoale-db <mihailo.aleksic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…for Single ColFamily ### What changes were proposed in this pull request? Introducing a new StatePartitionReader - StatePartitionReaderAllColumnFamilies to support offline repartition. StatePartitionReaderAllColumnFamilies is invoked when user specify option `readAllColumnFamilies` to true. We have the StateDataSource Reader, which allows customers to read the rows in an operator state store using the DataFrame API, just like they read a normal table. But it currently only supports reading one column family in the state store at a time. We would introduce a change to allow reading all the state rows in all the column families, so that we can repartition them at once. This would allow us to read the entire state store, repartition the rows, and then save the new repartition state rows to the cloud. This also has a perf impact, since we don’t have to read each column family separately. We would read the state based on the last committed batch version. Since each column family can have a different schema, the DataFrame we will return will treat the key and value row as bytes - - partition_key (string) - key_bytes (binary) - value_bytes (binary) - column_family_name (string) ### Why are the changes needed? See above ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? See unit test. It not only verify the schema, but also validate the data are serialized to bytes correctly by comparing them against the normal queried data frame ### Was this patch authored or co-authored using generative AI tooling? Yes. haiku, sonnet. Closes apache#53104 from zifeif2/repartition-reader-single-cf. Lead-authored-by: zifeif2 <zifeifeng11@gmail.com> Co-authored-by: Ubuntu <zifei.feng@your.hostname.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
…gate functions ### What changes were proposed in this pull request? This PR adds comprehensive documentation for Spark SQL's sketch-based approximate functions powered by the Apache DataSketches library. The new documentation page (`sql-ref-sketch-aggregates.md`) covers: **Function Reference:** - **HyperLogLog (HLL) Sketch Functions**: `hll_sketch_agg`, `hll_union_agg`, `hll_sketch_estimate`, `hll_union` - **Theta Sketch Functions**: `theta_sketch_agg`, `theta_union_agg`, `theta_intersection_agg`, `theta_sketch_estimate`, `theta_union`, `theta_intersection`, `theta_difference` - **KLL Quantile Sketch Functions**: `kll_sketch_agg_*`, `kll_sketch_to_string_*`, `kll_sketch_get_n_*`, `kll_sketch_merge_*`, `kll_sketch_get_quantile_*`, `kll_sketch_get_rank_*` - **Approximate Top-K Functions**: `approx_top_k_accumulate`, `approx_top_k_combine`, `approx_top_k_estimate` **Best Practices:** - Guidance on choosing between HLL and Theta sketches - Accuracy vs. memory trade-offs for each sketch type - Tips for storing and reusing sketches **Common Use Cases and Examples:** - Tracking daily unique users with HLL sketches (ETL workflow) - Computing percentiles over time with KLL sketches - Set operations with Theta sketches (intersection, difference for cohort analysis) - Finding trending items with Top-K sketches The PR also adds links to this new documentation page from: - `sql-ref-functions.md` (under Aggregate-like Functions) - `sql-ref.md` (under Functions section) - `_data/menu-sql.yaml` (navigation menu) ### Why are the changes needed? Spark SQL has added several sketch-based approximate functions using the Apache DataSketches library (HLL sketches in 3.5.0, Theta/KLL/Top-K sketches in 4.1.0), but there was no comprehensive documentation explaining: - How to use these functions together in practical ETL workflows - How to store sketches and merge them across multiple data batches - Best practices for choosing the right sketch type and tuning accuracy parameters This documentation fills that gap and helps users understand the full power of sketch-based analytics in Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, this PR adds new documentation pages that are user-facing. No code changes are included. ### How was this patch tested? Documentation-only change. The examples were verified against the existing function implementations and test cases in the codebase. ### Was this patch authored or co-authored using generative AI tooling? Yes, code assistance with `claude-4.5-opus-high` in combination with manual editing by the author. Closes apache#53297 from dtenedor/sketch-function-docs. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
….test_with_none_and_nan` ### What changes were proposed in this pull request? There was a bug in create dataframe from ndarray containing NaN values: NaN was incorrectly converted to Null when arrow-optimization is on, it happened to be resolved in apache#53280 ### Why are the changes needed? for test coverage ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53305 from zhengruifeng/reenable_test_with_none_and_nan. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? Optimize Py4J calls in schema inference ### Why are the changes needed? to fetch all configs in single py4j call ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53300 from zhengruifeng/py4j_infer_schema. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…lizer` with `GroupPandasUDFSerializer` ### What changes were proposed in this pull request? This PR consolidates `GroupPandasUDFSerializer` to support both `SQL_GROUPED_MAP_PANDAS_UDF` and `SQL_GROUPED_MAP_PANDAS_ITER_UDF`, aligning with the design pattern used by `GroupArrowUDFSerializer`. ### Why are the changes needed? When `Iterator[pandas.DataFrame]` API was added to `groupBy().applyInPandas()` in SPARK-53614 (apache#52716), a new `GroupPandasIterUDFSerializer` class was created. However, this class is nearly identical to `GroupPandasUDFSerializer`, differing only in whether batches are processed lazily (iterator mode) or all at once (regular mode). ### Does this PR introduce _any_ user-facing change? No, this is an internal refactoring that maintains backward compatibility. The API behavior remains the same from the user's perspective. ### How was this patch tested? Existing test cases. ### Was this patch authored or co-authored using generative AI tooling? Co-Generated-by: Cursor with Claude 4.5 Sonnet Closes apache#53043 from Yicong-Huang/SPARK-54316/refactor/consolidate-pandas-iter-serializer. Lead-authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Co-authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request? This PR improves CAST error messages to provide suggestions based on whether casts work in ANSI mode, non-ANSI mode, or try_cast(): 1. Suggest try_cast() when the cast is valid but fails in ANSI mode and try_cast() supports it 2. Suggest disabling ANSI mode when the cast works in non-ANSI mode but try_cast() doesn't support it 3. Provide no suggestion when the cast is invalid in all modes ### Why are the changes needed? In Spark 4.0.0, ANSI mode is now enabled by default. The previous error messages suggested users disable ANSI mode when encountering cast failures, which goes against the direction of the project. However, completely removing these suggestions would leave users without guidance when migrating code from Spark 3.x that relied on non-ANSI cast behavior. ### Does this PR introduce _any_ user-facing change? Yes. Error messages for CAST operations that fail in ANSI mode now suggest using `try_cast()` instead of disabling ANSI mode, but only when `try_cast()` actually supports that cast operation. **Before:** ``` cannot cast "ARRAY<INT>" to "ARRAY<BINARY>" with ANSI mode on. If you have to cast ARRAY<INT> to ARRAY<BINARY>, you can set "spark.sql.ansi.enabled" as 'false'. ``` **After (for casts supported by try_cast):** ``` cannot cast "ARRAY<INT>" to "ARRAY<BINARY>". To convert values from "ARRAY<INT>" to "ARRAY<BINARY>", you can use the functions `try_cast` instead. ``` **After (suggests disabling ANSI when try_cast doesn't support it):** ``` cannot cast "TIMESTAMP" to "BOOLEAN" with ANSI mode on. This cast is not allowed in ANSI mode but works when "spark.sql.ansi.enabled" is 'false'. ``` **After (for casts not supported by try_cast):** ``` cannot cast "INT" to "BINARY". ``` ### How was this patch tested? - Added new test case for complex type casts (Array[Int] to Array[Binary]) that demonstrates try_cast suggestion - Added test case for TIMESTAMP → BOOLEAN cast that demonstrates config suggestion - Updated existing test expectations in CastWithAnsiOnSuite to reflect the new behavior - All Cast-related unit tests pass - Verified no regression in legacy (ANSI off) mode - Manually tested with spark-shell to verify error messages ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53295 from qlong/SPARK-49635-remove-ansi-suggestion. Authored-by: Qiegang Long <qlong@Qiegangs-MacBook-Pro.local> Signed-off-by: Gengliang Wang <gengliang@apache.org>
5fbf32c to
bdadb02
Compare
### What changes were proposed in this pull request? Just remove some unnecessary import in a test file. ### Why are the changes needed? Clean up the code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53409 from gaogaotiantian/top-level-import. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…its own coverage object ### What changes were proposed in this pull request? Enforce the forked worker process to have it's own coverage object. This is used by the after fork hook for `coveragepy` itself. ### Why are the changes needed? It seems like some coverage data of workers was not properly recorded. Might be a racing issue. I applied this patch and confirmed that the worker coverage is properly recorded. We should see a better coverage on worker side. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Locally confirmed the new coverage. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53408 from gaogaotiantian/fix-worker-coverage. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
We have patterns like
```python
q.processAllAvailable()
q.awaitTermination(10)
```
which will make the test wait for 10 seconds to timeout - it is such a waste and it's causing our daily coverage run to timeout. We should `stop()` the query before `awaitTermination` and save these 10 seconds - there are a lot of them.
### Why are the changes needed?
It's a waste time and it's causing our CIs to timeout.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Locally it runs much faster (without the unnecessary wait).
### Was this patch authored or co-authored using generative AI tooling?
Closes apache#53394 from gaogaotiantian/stop-before-await.
Authored-by: Tian Gao <gaogaotiantian@hotmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This reverts commit 7e53ce8.
… string for test names" This reverts commit 7191a14.
### What changes were proposed in this pull request? Add `ruff` as an option to lint our code. ### Why are the changes needed? Our pinned `flake8` version is just too old - it can't even run on 3.12+. We can upgrade flake8 version but I think gradually switch to `ruff` is a better options. The main reason is `ruff` is much much faster than `flake8`. `ruff` returns the result almost immediately (ms-level) on whole spark repo - which means we can even hook it in the pre-commit in the future. It is surprisingly compatible with flake8 - there's almost no code change needed (with two extra ignored lint types which we can fix in the future). Everything it finds is a real issue instead of a different taste. `ruff` can also serve as a black-compatible formatter which means we can probably ditch both `flake8` and `black` in the future. For now we only enable this option - it's not hooked into any CI or full `./dev/lint-python`. However, I think we should do that soon. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Local lint test passed. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53378 from gaogaotiantian/add-ruff. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
bdadb02 to
e562be7
Compare
…ric views ### What changes were proposed in this pull request? This PR adds the complete serialization/deserialization infrastructure for parsing metric view YAML definitions: - Add Jackson YAML dependencies to pom.xml - Implement canonical model for metric views: - Column, Expression (Dimension/Measure), MetricView, Source - YAMLVersion validation and exception types - Implement version-specific serde (v0.1): - YAML deserializer/serializer - Base classes for extensibility - Add JSON utilities for metadata serialization ### Why are the changes needed? [SPIP: Metrics & semantic modeling in Spark](https://docs.google.com/document/d/1xVTLijvDTJ90lZ_ujwzf9HvBJgWg0mY6cYM44Fcghl0/edit?tab=t.0#heading=h.4iogryr5qznc) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` build/sbt "catalyst/testOnly org.apache.spark.sql.metricview.serde.MetricViewFactorySuite" ``` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreplyanthropic.com> Closes apache#53146 from linhongliu-db/metric-view-serde. Authored-by: Linhong Liu <linhong.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…type` to check the series dtype ### What changes were proposed in this pull request? Apply `pandas.api.types.is_integer_dtype` to check the series dtype ### Why are the changes needed? `pandas.api.types.is_integer_dtype` is a public pandas API, which is the standard way to check the dtype ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53416 from zhengruifeng/apply_stand_is_integer_dtype. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…nts.txt` ### What changes were proposed in this pull request? Add `viztracer` and `debugpy` in `dev/requirements.txt` ### Why are the changes needed? for code contributors, the new packages will be installed in `pip install -r dev/requirements.txt` ### Does this PR introduce _any_ user-facing change? no, dev-only ### How was this patch tested? manually check `pip install -r dev/requirements.txt` ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53419 from zhengruifeng/req_debug. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? A minor typo fix to documentation. ### Why are the changes needed? Clearer documentation for users. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53285 from kepler62f/master. Authored-by: kepler62f <blue.kepler62f@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…rom `connect.StreamingQueryManager` ### What changes were proposed in this pull request? apache#41752 introduced a `listenerCache` and related private methods (`cacheListenerById`, `getIdByListener`, and `removeCachedListener`) for `connect.StreamingQueryManager`. However, in apache#46287, the usage related to `listenerCache` was replaced by `streamingQueryListenerBus`. As a result, `listenerCache` and its associated private methods are no longer in use, and this current pr cleans them up. ### Why are the changes needed? Code cleanup. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53420 from LuciferYang/StreamingQueryManager. Lead-authored-by: yangjie01 <yangjie01@baidu.com> Co-authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Add some additional end to end tests for RTM ### Why are the changes needed? To have better test coverage for RTM functionality ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A. Only tests are added ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52870 from jerrypeng/SPARK-53998-2. Authored-by: Jerry Peng <jerry.peng@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ted state ### What changes were proposed in this pull request? 1. Modifies `ChecksumCancellableFSDataOutputStream.cancel()` to cancel both the main stream and checksum stream synchronously instead of using Futures with awaitResult. 2. Moves `changelogWriter.foreach(_.abort())` and `changelogWriter = None` in a try finally block within `RocksDB.rollback()`. ### Why are the changes needed? For fix 1: When cancel() is called while the thread is in an interrupted state (e.g., during task cancellation), the previous implementation would fail. The code submitted Futures to cancel each stream, then called awaitResult() to wait for completion. However, awaitResult() checks the thread's interrupt flag and throws InterruptedException immediately if the thread is interrupted. For fix 2: Consider the case where `abort()` is called on `RocksDBStateStoreProvider`. This calls `rollback()` on the `RocksDB` instance, which in turn calls `changelogWriter.foreach(_.abort())` and then sets `changelogWriter = None`. However, if `changelogWriter.abort()` throws an exception, the finally block still sets `backingFileStream` and `compressedStream` to `null`. The exception propagates, and we never reach the line that sets `changelogWriter = None`. This leaves the RocksDB instance in an inconsistent state: - changelogWriter = Some(changelogWriterWeAttemptedToAbort) - changelogWriterWeAttemptedToAbort.backingFileStream = null - changelogWriterWeAttemptedToAbort.compressedStream = null Now consider calling `RocksDB.load()` again. This calls `replayChangelog()`, which calls `put()`, which calls `changelogWriter.put()`. At this point, the assertion `assert(compressedStream != null)` fails, causing an exception while loading the StateStore. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test `"SPARK-54585: Interrupted task calling rollback does not throw an exception"` which simulates the case when a thread in the interrupted state and begins a rollback ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53313 from dylanwong250/SPARK-54585. Authored-by: Dylan Wong <dylan.wong@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
### What changes were proposed in this pull request? Add sanity check for number of configurations being passed. ### Why are the changes needed? This is helpful to recognize malformed message - avoid potential deadlock when the message does not conform to protocol. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This error should not happen and it should not break CI either. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53359 from gaogaotiantian/runner-conf-sanity-check. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…viztracer` ### What changes were proposed in this pull request? Add link and examples for `run-with-viztracer` ### Why are the changes needed? to make it easier for developers to have a try ### Does this PR introduce _any_ user-facing change? dev-only changes ### How was this patch tested? manually check ```sh (spark_dev_313) ➜ spark git:(doc_viz) python/run-with-viztracer -h Usage: run-with-viztracer your_original_commands To view the profiling results, run: vizviewer pyspark_*.json Environment: If SPARK_VIZTRACER_OUTPUT_DIR is set, the output will be saved to the directory. Otherwise, it will be saved to the current directory. Requirements: - viztracer must be installed (pip install viztracer) Check the following documentation for more information on using viztracer: https://viztracer.readthedocs.io/en/latest/ Examples: - Start pyspark shell python/run-with-viztracer bin/pyspark --conf spark.driver.memory=16g - Start pyspark shell in Connect mode python/run-with-viztracer bin/pyspark --remote local ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53418 from zhengruifeng/doc_viz. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
ba86638 to
1373ae1
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Introducing StatePartitionAllColumnFamiliesWriter as part of the offline repartition project. In this PR, we only support a single-column-family operator.
This writer takes the repartitioned DataFrame returned from StatePartitionAllColumnFamiliesReader and writes it to a new version in the state store. See the comments for the DataFrame schema. In addition, this writer does not load previous state (since we are overwriting the state with the repartitioned data), and when committing, it will always commit a snapshot.
Major Changes
Why are the changes needed?
This will be used in offline repartitioning
Does this PR introduce any user-facing change?
No
How was this patch tested?
Integration tests in
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionAllColumnFamiliesWriterSuite.scalaUnit tests in
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scalaWas this patch authored or co-authored using generative AI tooling?
Yes. Sonnet 4.5