From 58fe17b418c59c4f1dc8b05ef88e3c9a0ad8dab9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 15:58:29 -0600 Subject: [PATCH 01/16] Add option to enable JSON scan --- .../scala/org/apache/comet/CometConf.scala | 13 +++++++-- .../comet/CometSparkSessionExtensions.scala | 27 ++++++++++++------- .../resources/test-data/json-test-1.ndjson | 4 +++ .../exec/CometColumnarShuffleSuite.scala | 2 +- .../apache/comet/exec/CometExecSuite.scala | 9 +++++++ .../org/apache/spark/sql/CometTestBase.scala | 1 + 6 files changed, 44 insertions(+), 12 deletions(-) create mode 100644 spark/src/test/resources/test-data/json-test-1.ndjson diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1f4874eb6..1fe732b68 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -84,15 +84,24 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean) - val COMET_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled") + val COMET_SCAN_PARQUET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled") .doc( "Whether to enable Comet scan. When this is turned on, Spark will use Comet to read " + - "Parquet data source. Note that to enable native vectorized execution, both this " + + "Parquet data sources. Note that to enable native vectorized execution, both this " + "config and 'spark.comet.exec.enabled' need to be enabled. By default, this config " + "is true.") .booleanConf .createWithDefault(true) + val COMET_SCAN_JSON_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.json.enabled") + .doc( + "Whether to enable Comet scan. When this is turned on, Spark will use Comet to read " + + "JSON data sources. Note that to enable native vectorized execution, both this " + + "config and 'spark.comet.exec.enabled' need to be enabled. By default, this config " + + "is false.") + .booleanConf + .createWithDefault(false) + val COMET_EXEC_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.enabled") .doc( "Whether to enable Comet native vectorized execution for Spark. This controls whether " + diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index f24624dcf..38e490e26 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -20,7 +20,6 @@ package org.apache.comet import java.nio.ByteOrder - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit @@ -42,7 +41,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf - import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} @@ -50,6 +48,8 @@ import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde import org.apache.comet.shims.ShimCometSparkSessionExtensions +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat /** * The entry point of Comet extension to Spark. This class is responsible for injecting Comet @@ -1095,7 +1095,7 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = { - COMET_SCAN_ENABLED.get(conf) + COMET_SCAN_PARQUET_ENABLED.get(conf) } private[comet] def isCometExecEnabled(conf: SQLConf): Boolean = { @@ -1131,12 +1131,21 @@ object CometSparkSessionExtensions extends Logging { // operators can have a chance to be converted to columnar. Leaf operators that output // columnar batches, such as Spark's vectorized readers, will also be converted to native // comet batches. - // TODO: consider converting other intermediate operators to columnar. - op.isInstanceOf[LeafExecNode] && CometSparkToColumnarExec.isSchemaSupported(op.schema) && - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { - val simpleClassName = Utils.getSimpleName(op.getClass) - val nodeName = simpleClassName.replaceAll("Exec$", "") - COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) + op match { + case scan : FileSourceScanExec => scan.relation.fileFormat match { + case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } + case _: LeafExecNode => + CometSparkToColumnarExec.isSchemaSupported(op.schema) && + COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + val simpleClassName = Utils.getSimpleName(op.getClass) + val nodeName = simpleClassName.replaceAll("Exec$", "") + COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) + } + case _ => + // TODO: consider converting other intermediate operators to columnar. + false } } diff --git a/spark/src/test/resources/test-data/json-test-1.ndjson b/spark/src/test/resources/test-data/json-test-1.ndjson new file mode 100644 index 000000000..324b3060e --- /dev/null +++ b/spark/src/test/resources/test-data/json-test-1.ndjson @@ -0,0 +1,4 @@ +{ "a": [1], "b": { "c": "foo" }} +{ "a": 2, "b": { "d": "bar" }} +{ "b": { "d": "bar" }} +{ "a": 3 } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 78b4bbb91..3cf533a6d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -831,7 +831,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2 SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE - CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan + CometConf.COMET_SCAN_PARQUET_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 5ca9401c6..cad77111d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1606,6 +1606,15 @@ class CometExecSuite extends CometTestBase { } }) } + + test("read JSON file") { + withSQLConf( + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", + CometConf.COMET_SCAN_JSON_ENABLED.key -> "true") { + spark.read.json("spark/src/test/resources/test-data/json-test-1.ndjson").createOrReplaceTempView("tbl") + checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl") + } + } } case class BucketedTableTestSpec( diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index fd0328c88..d9856d970 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -146,6 +146,7 @@ abstract class CometTestBase sparkPlan = dfSpark.queryExecution.executedPlan } val dfComet = Dataset.ofRows(spark, df.logicalPlan) + val actual = dfComet.collect() checkAnswer(dfComet, expected) (sparkPlan, dfComet.queryExecution.executedPlan) } From 3502124dd86bace771c54271f1f34ea270a6d46d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 16:06:54 -0600 Subject: [PATCH 02/16] support v1 and v2 data sources --- .../apache/comet/CometSparkSessionExtensions.scala | 8 ++++++++ .../org/apache/comet/exec/CometExecSuite.scala | 13 ++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 38e490e26..53b1b09c4 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -50,6 +50,7 @@ import org.apache.comet.serde.QueryPlanSerde import org.apache.comet.shims.ShimCometSparkSessionExtensions import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.v2.json.JsonScan /** * The entry point of Comet extension to Spark. This class is responsible for injecting Comet @@ -1132,10 +1133,17 @@ object CometSparkSessionExtensions extends Logging { // columnar batches, such as Spark's vectorized readers, will also be converted to native // comet batches. op match { + // v1 scan case scan : FileSourceScanExec => scan.relation.fileFormat match { case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) case _ => false } + // v2 scan + case scan : BatchScanExec => scan.scan match { + case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } + // other leaf nodes case _: LeafExecNode => CometSparkToColumnarExec.isSchemaSupported(op.schema) && COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index cad77111d..6a8da9988 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1608,11 +1608,14 @@ class CometExecSuite extends CometTestBase { } test("read JSON file") { - withSQLConf( - CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", - CometConf.COMET_SCAN_JSON_ENABLED.key -> "true") { - spark.read.json("spark/src/test/resources/test-data/json-test-1.ndjson").createOrReplaceTempView("tbl") - checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl") + Seq("", "json").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", + CometConf.COMET_SCAN_JSON_ENABLED.key -> "true") { + spark.read.json("spark/src/test/resources/test-data/json-test-1.ndjson").createOrReplaceTempView("tbl") + checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl") + } } } } From 9537ac4681c54e1ff7e1ad16feea878a86d3bff0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 16:14:28 -0600 Subject: [PATCH 03/16] formatting --- docs/source/user-guide/operators.md | 29 ++++++++++--------- .../comet/CometSparkSessionExtensions.scala | 7 +++-- .../org/apache/spark/sql/CometTestBase.scala | 1 - 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/docs/source/user-guide/operators.md b/docs/source/user-guide/operators.md index e3a3ac522..516c4215a 100644 --- a/docs/source/user-guide/operators.md +++ b/docs/source/user-guide/operators.md @@ -22,17 +22,18 @@ The following Spark operators are currently replaced with native versions. Query stages that contain any operators not supported by Comet will fall back to regular Spark execution. -| Operator | Notes | -| -------------------------------------------- | ----- | -| FileSourceScanExec/BatchScanExec for Parquet | | -| Projection | | -| Filter | | -| Sort | | -| Hash Aggregate | | -| Limit | | -| Sort-merge Join | | -| Hash Join | | -| BroadcastHashJoinExec | | -| Shuffle | | -| Expand | | -| Union | | +| Operator | Notes | +| ---------------------------------------------------- | ----------------------------------------------------------------- | +| FileSourceScanExec/BatchScanExec for Parquet sources | | +| FileSourceScanExec/BatchScanExec for JSON sources | Experimental. Set `spark.comet.scan.json.enabled=true` to enable. | +| Projection | | +| Filter | | +| Sort | | +| Hash Aggregate | | +| Limit | | +| Sort-merge Join | | +| Hash Join | | +| BroadcastHashJoinExec | | +| Shuffle | | +| Expand | | +| Union | | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 53b1b09c4..9e59620de 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -20,6 +20,7 @@ package org.apache.comet import java.nio.ByteOrder + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit @@ -34,13 +35,16 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.json.JsonScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf + import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} @@ -48,9 +52,6 @@ import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde import org.apache.comet.shims.ShimCometSparkSessionExtensions -import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat -import org.apache.spark.sql.execution.datasources.json.JsonFileFormat -import org.apache.spark.sql.execution.datasources.v2.json.JsonScan /** * The entry point of Comet extension to Spark. This class is responsible for injecting Comet diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index d9856d970..fd0328c88 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -146,7 +146,6 @@ abstract class CometTestBase sparkPlan = dfSpark.queryExecution.executedPlan } val dfComet = Dataset.ofRows(spark, df.logicalPlan) - val actual = dfComet.collect() checkAnswer(dfComet, expected) (sparkPlan, dfComet.queryExecution.executedPlan) } From 2eb2527b7722f6bf41ab57f86f8862c8afb8f1fa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 16:26:19 -0600 Subject: [PATCH 04/16] format --- docs/source/user-guide/configs.md | 3 ++- .../comet/CometSparkSessionExtensions.scala | 20 ++++++++++--------- .../apache/comet/exec/CometExecSuite.scala | 4 +++- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 836f64441..f079859e4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -64,7 +64,8 @@ Comet provides the following configuration settings. | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | -| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | +| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data sources. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | +| spark.comet.scan.json.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read JSON data sources. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is false. | false | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 9e59620de..ef90e76d0 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1135,19 +1135,21 @@ object CometSparkSessionExtensions extends Logging { // comet batches. op match { // v1 scan - case scan : FileSourceScanExec => scan.relation.fileFormat match { - case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false - } + case scan: FileSourceScanExec => + scan.relation.fileFormat match { + case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } // v2 scan - case scan : BatchScanExec => scan.scan match { - case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false - } + case scan: BatchScanExec => + scan.scan match { + case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } // other leaf nodes case _: LeafExecNode => CometSparkToColumnarExec.isSchemaSupported(op.schema) && - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { val simpleClassName = Utils.getSimpleName(op.getClass) val nodeName = simpleClassName.replaceAll("Exec$", "") COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 6a8da9988..8915d5a08 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1613,7 +1613,9 @@ class CometExecSuite extends CometTestBase { SQLConf.USE_V1_SOURCE_LIST.key -> v1List, CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", CometConf.COMET_SCAN_JSON_ENABLED.key -> "true") { - spark.read.json("spark/src/test/resources/test-data/json-test-1.ndjson").createOrReplaceTempView("tbl") + spark.read + .json("spark/src/test/resources/test-data/json-test-1.ndjson") + .createOrReplaceTempView("tbl") checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl") } } From 8f015dd8b8021dc65100dbdd384618f27f35bc3f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 17:54:51 -0600 Subject: [PATCH 05/16] Add schema check --- .../comet/CometSparkSessionExtensions.scala | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index ef90e76d0..90e9d9a86 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1133,30 +1133,33 @@ object CometSparkSessionExtensions extends Logging { // operators can have a chance to be converted to columnar. Leaf operators that output // columnar batches, such as Spark's vectorized readers, will also be converted to native // comet batches. - op match { - // v1 scan - case scan: FileSourceScanExec => - scan.relation.fileFormat match { - case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false - } - // v2 scan - case scan: BatchScanExec => - scan.scan match { - case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false - } - // other leaf nodes - case _: LeafExecNode => - CometSparkToColumnarExec.isSchemaSupported(op.schema) && - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { - val simpleClassName = Utils.getSimpleName(op.getClass) - val nodeName = simpleClassName.replaceAll("Exec$", "") - COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) - } - case _ => - // TODO: consider converting other intermediate operators to columnar. - false + if (CometSparkToColumnarExec.isSchemaSupported(op.schema)) { + op match { + // v1 scan + case scan: FileSourceScanExec => + scan.relation.fileFormat match { + case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } + // v2 scan + case scan: BatchScanExec => + scan.scan match { + case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _ => false + } + // other leaf nodes + case _: LeafExecNode => + COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + val simpleClassName = Utils.getSimpleName(op.getClass) + val nodeName = simpleClassName.replaceAll("Exec$", "") + COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) + } + case _ => + // TODO: consider converting other intermediate operators to columnar. + false + } + } else { + false } } From f78b87da8b6838a4bfa12b6135fa751a1ba326fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Aug 2024 21:23:46 -0600 Subject: [PATCH 06/16] fix regression --- .../comet/CometSparkSessionExtensions.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 90e9d9a86..247aaa8c6 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1139,21 +1139,17 @@ object CometSparkSessionExtensions extends Logging { case scan: FileSourceScanExec => scan.relation.fileFormat match { case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false + case _ => isSparkToColumnarEnabled(conf, op) } // v2 scan case scan: BatchScanExec => scan.scan match { case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) - case _ => false + case _ => isSparkToColumnarEnabled(conf, op) } // other leaf nodes case _: LeafExecNode => - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { - val simpleClassName = Utils.getSimpleName(op.getClass) - val nodeName = simpleClassName.replaceAll("Exec$", "") - COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) - } + isSparkToColumnarEnabled(conf, op) case _ => // TODO: consider converting other intermediate operators to columnar. false @@ -1163,6 +1159,14 @@ object CometSparkSessionExtensions extends Logging { } } + private def isSparkToColumnarEnabled(conf: SQLConf, op: SparkPlan) = { + COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + val simpleClassName = Utils.getSimpleName(op.getClass) + val nodeName = simpleClassName.replaceAll("Exec$", "") + COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) + } + } + def isSpark33Plus: Boolean = { org.apache.spark.SPARK_VERSION >= "3.3" } From 111586fb6eced852ece59d42fffbdbcfa7ea2a5c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 06:57:22 -0600 Subject: [PATCH 07/16] improve configs and docs --- .../scala/org/apache/comet/CometConf.scala | 35 ++++++++++++------- docs/source/index.rst | 5 +-- docs/source/user-guide/configs.md | 5 +-- docs/source/user-guide/datasources.md | 32 +++++++++++++++++ docs/source/user-guide/operators.md | 28 +++++++-------- .../comet/CometSparkSessionExtensions.scala | 8 +++-- .../apache/comet/CometExpressionSuite.scala | 2 +- .../exec/CometColumnarShuffleSuite.scala | 2 +- .../apache/comet/exec/CometExecSuite.scala | 2 +- 9 files changed, 81 insertions(+), 38 deletions(-) create mode 100644 docs/source/user-guide/datasources.md diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1fe732b68..9b5f47f8b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -84,23 +84,32 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean) - val COMET_SCAN_PARQUET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled") + val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled") .doc( - "Whether to enable Comet scan. When this is turned on, Spark will use Comet to read " + - "Parquet data sources. Note that to enable native vectorized execution, both this " + - "config and 'spark.comet.exec.enabled' need to be enabled. By default, this config " + - "is true.") + "Whether to enable native scans. When this is turned on, Spark will use Comet to " + + "read supported data sources (currently only Parquet is supported natively). Note " + + "that to enable native vectorized execution, both this config and " + + "'spark.comet.exec.enabled' need to be enabled. By default, this config is true.") .booleanConf .createWithDefault(true) - val COMET_SCAN_JSON_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.json.enabled") - .doc( - "Whether to enable Comet scan. When this is turned on, Spark will use Comet to read " + - "JSON data sources. Note that to enable native vectorized execution, both this " + - "config and 'spark.comet.exec.enabled' need to be enabled. By default, this config " + - "is false.") - .booleanConf - .createWithDefault(false) + val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.convert.parquet.enabled") + .doc( + "When enabled, data from Parquet v1 and v2 scans will be converted to Arrow format. Note " + + "that to enable native vectorized execution, both this config and " + + "'spark.comet.exec.enabled' need to be enabled.") + .booleanConf + .createWithDefault(false) + + val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.convert.json.enabled") + .doc( + "When enabled, data from JSON v1 and v2 scans will be converted to Arrow format. Note " + + "that to enable native vectorized execution, both this config and " + + "'spark.comet.exec.enabled' need to be enabled.") + .booleanConf + .createWithDefault(false) val COMET_EXEC_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.enabled") .doc( diff --git a/docs/source/index.rst b/docs/source/index.rst index 101c5785a..4bf5d9fde 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -42,9 +42,10 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Comet Overview Installing Comet - Supported Expressions - Supported Operators + Supported Data Sources Supported Data Types + Supported Operators + Supported Expressions Configuration Settings Compatibility Guide Tuning Guide diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f079859e4..7b0da5bcc 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -30,6 +30,8 @@ Comet provides the following configuration settings. | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | | spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be allocated per executor process for Comet shuffle. Comet memory size is specified by `spark.comet.memoryOverhead` or calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default, this config is 1.0. | 1.0 | +| spark.comet.convert.json.enabled | When enabled, data from JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | +| spark.comet.convert.parquet.enabled | When enabled, data from Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | | spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | | spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true | | spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | @@ -64,8 +66,7 @@ Comet provides the following configuration settings. | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | -| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data sources. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | -| spark.comet.scan.json.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read JSON data sources. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is false. | false | +| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md new file mode 100644 index 000000000..5f6574956 --- /dev/null +++ b/docs/source/user-guide/datasources.md @@ -0,0 +1,32 @@ + + +# Supported Spark Data Sources + +## Parquet + +When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types +in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, +enabling `spark.comet.convert.json.enabled` will immediately convert the data into Arrow format, allowing native +execution to happen after that, but the process may not be efficient. + +## JSON + +Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately +converted into Arrow format, allowing native execution to happen after that. diff --git a/docs/source/user-guide/operators.md b/docs/source/user-guide/operators.md index 516c4215a..7edbb1e70 100644 --- a/docs/source/user-guide/operators.md +++ b/docs/source/user-guide/operators.md @@ -22,18 +22,16 @@ The following Spark operators are currently replaced with native versions. Query stages that contain any operators not supported by Comet will fall back to regular Spark execution. -| Operator | Notes | -| ---------------------------------------------------- | ----------------------------------------------------------------- | -| FileSourceScanExec/BatchScanExec for Parquet sources | | -| FileSourceScanExec/BatchScanExec for JSON sources | Experimental. Set `spark.comet.scan.json.enabled=true` to enable. | -| Projection | | -| Filter | | -| Sort | | -| Hash Aggregate | | -| Limit | | -| Sort-merge Join | | -| Hash Join | | -| BroadcastHashJoinExec | | -| Shuffle | | -| Expand | | -| Union | | +| Operator | Notes | +| --------------------- | ----- | +| Projection | | +| Filter | | +| Sort | | +| Hash Aggregate | | +| Limit | | +| Sort-merge Join | | +| Hash Join | | +| BroadcastHashJoinExec | | +| Shuffle | | +| Expand | | +| Union | | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 247aaa8c6..cb0fbbb27 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1097,7 +1097,7 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = { - COMET_SCAN_PARQUET_ENABLED.get(conf) + COMET_NATIVE_SCAN_ENABLED.get(conf) } private[comet] def isCometExecEnabled(conf: SQLConf): Boolean = { @@ -1138,13 +1138,15 @@ object CometSparkSessionExtensions extends Logging { // v1 scan case scan: FileSourceScanExec => scan.relation.fileFormat match { - case _: JsonFileFormat => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => isSparkToColumnarEnabled(conf, op) } // v2 scan case scan: BatchScanExec => scan.scan match { - case _: JsonScan => CometConf.COMET_SCAN_JSON_ENABLED.get(conf) + case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) + case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => isSparkToColumnarEnabled(conf, op) } // other leaf nodes diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 8eeb998bd..8af49970f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1976,7 +1976,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("get_struct_field") { withSQLConf( CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true", - CometConf.COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.key -> "FileSourceScan") { + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "FileSourceScan") { withTempPath { dir => var df = spark .range(5) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 3cf533a6d..a2f07155d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -831,7 +831,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2 SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE - CometConf.COMET_SCAN_PARQUET_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 8915d5a08..fb9777418 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1612,7 +1612,7 @@ class CometExecSuite extends CometTestBase { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", - CometConf.COMET_SCAN_JSON_ENABLED.key -> "true") { + CometConf.COMET_CONVERT_FROM_JSON_ENABLED.key -> "true") { spark.read .json("spark/src/test/resources/test-data/json-test-1.ndjson") .createOrReplaceTempView("tbl") From c998b061162c38505587f93df038d4d856182440 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 06:57:45 -0600 Subject: [PATCH 08/16] fix --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 8af49970f..aed310467 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1976,7 +1976,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("get_struct_field") { withSQLConf( CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true", - CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "FileSourceScan") { + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { withTempPath { dir => var df = spark .range(5) From 961e38182b1df664a61406bd8429bc323ddaee36 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 07:00:34 -0600 Subject: [PATCH 09/16] improve test and fix typo in doc --- docs/source/user-guide/datasources.md | 2 +- .../apache/comet/CometExpressionSuite.scala | 45 ++++++++++--------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 5f6574956..92c13f671 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -23,7 +23,7 @@ When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, -enabling `spark.comet.convert.json.enabled` will immediately convert the data into Arrow format, allowing native +enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient. ## JSON diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index aed310467..270a04f18 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1974,27 +1974,30 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("get_struct_field") { - withSQLConf( - CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true", - CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { - withTempPath { dir => - var df = spark - .range(5) - // Add both a null struct and null inner value - .select( - when( - col("id") > 1, - struct( - when(col("id") > 2, col("id")).alias("id"), - when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) - .as("nested2"))) - .alias("nested1")) - - df.write.parquet(dir.toString()) - - df = spark.read.parquet(dir.toString()) - checkSparkAnswerAndOperator(df.select("nested1.id")) - checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + Seq("", "parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + var df = spark + .range(5) + // Add both a null struct and null inner value + .select( + when( + col("id") > 1, + struct( + when(col("id") > 2, col("id")).alias("id"), + when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) + .as("nested2"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + + df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1.id")) + checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + } } } } From 6f47ec29a97d05a1900a6646fa2f2085850d9476 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 07:11:15 -0600 Subject: [PATCH 10/16] renaming some variables but no change to public config names --- .../src/main/scala/org/apache/comet/CometConf.scala | 10 +++++----- .../apache/comet/CometSparkSessionExtensions.scala | 12 ++++++------ .../org/apache/comet/CometExpressionSuite.scala | 2 +- .../scala/org/apache/spark/sql/CometTestBase.scala | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9b5f47f8b..2212450ca 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -459,20 +459,20 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT) - val COMET_SPARK_TO_COLUMNAR_ENABLED: ConfigEntry[Boolean] = + val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.sparkToColumnar.enabled") .internal() - .doc("Whether to enable Spark to Comet columnar conversion. When this is turned on, " + + .doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " + "Comet will convert operators in " + - "`spark.comet.sparkToColumnar.supportedOperatorList` into Comet columnar based before " + + "`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " + "processing.") .booleanConf .createWithDefault(false) - val COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] = + val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] = conf("spark.comet.sparkToColumnar.supportedOperatorList") .doc( - "A comma-separated list of operators that will be converted to Comet columnar " + + "A comma-separated list of operators that will be converted to Arrow columnar " + "format when 'spark.comet.sparkToColumnar.enabled' is true") .stringConf .toSequence diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index cb0fbbb27..08b9eccee 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1140,18 +1140,18 @@ object CometSparkSessionExtensions extends Logging { scan.relation.fileFormat match { case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) - case _ => isSparkToColumnarEnabled(conf, op) + case _ => isSparkToArrowEnabled(conf, op) } // v2 scan case scan: BatchScanExec => scan.scan match { case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) - case _ => isSparkToColumnarEnabled(conf, op) + case _ => isSparkToArrowEnabled(conf, op) } // other leaf nodes case _: LeafExecNode => - isSparkToColumnarEnabled(conf, op) + isSparkToArrowEnabled(conf, op) case _ => // TODO: consider converting other intermediate operators to columnar. false @@ -1161,11 +1161,11 @@ object CometSparkSessionExtensions extends Logging { } } - private def isSparkToColumnarEnabled(conf: SQLConf, op: SparkPlan) = { - COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && { + private def isSparkToArrowEnabled(conf: SQLConf, op: SparkPlan) = { + COMET_SPARK_TO_ARROW_ENABLED.get(conf) && { val simpleClassName = Utils.getSimpleName(op.getClass) val nodeName = simpleClassName.replaceAll("Exec$", "") - COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) + COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName) } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 270a04f18..b22b61b99 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1977,7 +1977,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("", "parquet").foreach { v1List => withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, - CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true", + CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { withTempPath { dir => var df = spark diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index fd0328c88..a88f9f45c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -79,7 +79,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") - conf.set(CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key, "true") + conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf } From fa4603526265beb32c2a65cb2d142b943cc4ee0c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 07:32:58 -0600 Subject: [PATCH 11/16] improve test --- docs/source/user-guide/configs.md | 2 +- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7b0da5bcc..883a8d301 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -70,4 +70,4 @@ Comet provides the following configuration settings. | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | -| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan | +| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan | diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index b22b61b99..45759c73d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1977,6 +1977,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("", "parquet").foreach { v1List => withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { withTempPath { dir => From 69074828351d29dbd018e07f6cd68d11c6bbcdfe Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 07:41:54 -0600 Subject: [PATCH 12/16] improve test --- spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 45759c73d..7c68aaa41 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1978,7 +1978,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", - CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { withTempPath { dir => var df = spark From 223427728686d99d8575b0131782eac94434024f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 11:13:43 -0600 Subject: [PATCH 13/16] Update spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala Co-authored-by: Liang-Chi Hsieh --- .../scala/org/apache/comet/CometSparkSessionExtensions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 08b9eccee..480cd955e 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1135,7 +1135,7 @@ object CometSparkSessionExtensions extends Logging { // comet batches. if (CometSparkToColumnarExec.isSchemaSupported(op.schema)) { op match { - // v1 scan + // Convert Spark DS v1 scan to Arrow format case scan: FileSourceScanExec => scan.relation.fileFormat match { case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) From a828d8f9524cbf1670b36d921615914cc2d58723 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 11:14:01 -0600 Subject: [PATCH 14/16] Update spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala Co-authored-by: Liang-Chi Hsieh --- .../scala/org/apache/comet/CometSparkSessionExtensions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 480cd955e..6a69ffcf3 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1142,7 +1142,7 @@ object CometSparkSessionExtensions extends Logging { case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => isSparkToArrowEnabled(conf, op) } - // v2 scan + // onvert Spark DS v2 scan to Arrow format case scan: BatchScanExec => scan.scan match { case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) From dc31a026177dea796affd8444bc69c72808bd9dd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 11:19:30 -0600 Subject: [PATCH 15/16] improve config description --- common/src/main/scala/org/apache/comet/CometConf.scala | 8 ++++---- docs/source/user-guide/configs.md | 4 ++-- .../org/apache/comet/CometSparkSessionExtensions.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 2212450ca..21221fd66 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -96,8 +96,8 @@ object CometConf extends ShimCometConf { val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.parquet.enabled") .doc( - "When enabled, data from Parquet v1 and v2 scans will be converted to Arrow format. Note " + - "that to enable native vectorized execution, both this config and " + + "When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " + + "Arrow format. Note that to enable native vectorized execution, both this config and " + "'spark.comet.exec.enabled' need to be enabled.") .booleanConf .createWithDefault(false) @@ -105,8 +105,8 @@ object CometConf extends ShimCometConf { val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.json.enabled") .doc( - "When enabled, data from JSON v1 and v2 scans will be converted to Arrow format. Note " + - "that to enable native vectorized execution, both this config and " + + "When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " + + "Arrow format. Note that to enable native vectorized execution, both this config and " + "'spark.comet.exec.enabled' need to be enabled.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 883a8d301..8465baa7c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -30,8 +30,8 @@ Comet provides the following configuration settings. | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | | spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be allocated per executor process for Comet shuffle. Comet memory size is specified by `spark.comet.memoryOverhead` or calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default, this config is 1.0. | 1.0 | -| spark.comet.convert.json.enabled | When enabled, data from JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | -| spark.comet.convert.parquet.enabled | When enabled, data from Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | +| spark.comet.convert.json.enabled | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | +| spark.comet.convert.parquet.enabled | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false | | spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | | spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true | | spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 6a69ffcf3..e978e7d36 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1142,7 +1142,7 @@ object CometSparkSessionExtensions extends Logging { case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) case _ => isSparkToArrowEnabled(conf, op) } - // onvert Spark DS v2 scan to Arrow format + // Convert Spark DS v2 scan to Arrow format case scan: BatchScanExec => scan.scan match { case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) From 95ce2b62c2a14f843b63d186747c14bc1e749905 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 15 Aug 2024 12:33:45 -0600 Subject: [PATCH 16/16] fix path --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index fb9777418..15864a339 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1614,7 +1614,7 @@ class CometExecSuite extends CometTestBase { CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true", CometConf.COMET_CONVERT_FROM_JSON_ENABLED.key -> "true") { spark.read - .json("spark/src/test/resources/test-data/json-test-1.ndjson") + .json("src/test/resources/test-data/json-test-1.ndjson") .createOrReplaceTempView("tbl") checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl") }