From e4994ee775cabbe16ddf5a657da8d431db976a2a Mon Sep 17 00:00:00 2001 From: Rahul Babarwal Date: Sat, 15 Nov 2025 18:05:32 +0530 Subject: [PATCH 1/3] Fix 2710 CometWindowExec should correctly account output attributes --- .../scala/org/apache/spark/sql/comet/CometWindowExec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala index 0a783b922a..17f5c62469 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.comet import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CurrentRow, Expression, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CurrentRow, Expression, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max, Min, Sum} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan @@ -334,6 +334,7 @@ case class CometWindowExec( extends CometUnaryExec { override def nodeName: String = "CometWindowExec" + override def producedAttributes: AttributeSet = outputSet ++ AttributeSet(windowExpression) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), From 36fc8508b7123f28ab94ebea3d2a429d53b8f249 Mon Sep 17 00:00:00 2001 From: Rahul Babarwal Date: Sun, 16 Nov 2025 01:56:50 +0530 Subject: [PATCH 2/3] Fixing the failed tests --- docs/source/user-guide/latest/configs.md | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 7e3d2a79f7..98291d1ec2 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -58,20 +58,21 @@ Comet provides the following configuration settings. -| Config | Description | Default Value | -| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | -| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | -| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | -| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | -| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | -| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | -| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | -| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | -| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | -| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| Config | Description | Default Value | +| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | +| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | +| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | +| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | +| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | +| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | +| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | +| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | +| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| `spark.comet.expression.allowIncompatible` | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | +| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | +| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | +| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | @@ -89,6 +90,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false | | `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | +| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | @@ -139,11 +141,13 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.convert.json.enabled` | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.convert.parquet.enabled` | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | +| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | | `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | | `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | | `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | | `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | +| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | From 22808122c6ba8c168617cf10f7545e24fd9dc1c1 Mon Sep 17 00:00:00 2001 From: Rahul Babarwal Date: Sun, 16 Nov 2025 19:41:53 +0530 Subject: [PATCH 3/3] Fixing the prettier errors --- docs/source/user-guide/latest/configs.md | 32 +++++++++++------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 98291d1ec2..7e3d2a79f7 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -58,21 +58,20 @@ Comet provides the following configuration settings. -| Config | Description | Default Value | -| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | -| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | -| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | -| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | -| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | -| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | -| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| `spark.comet.expression.allowIncompatible` | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | -| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | -| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | -| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| Config | Description | Default Value | +| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | +| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | +| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | +| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | +| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | +| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | +| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | +| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | +| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | +| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | +| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | +| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | @@ -90,7 +89,6 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false | | `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | -| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | @@ -141,13 +139,11 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.convert.json.enabled` | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.convert.parquet.enabled` | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | -| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | | `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | | `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | | `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | | `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | -| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |