Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add specific configs for converting Spark Parquet and JSON data to Arrow #832

Merged
merged 16 commits into from
Aug 16, 2024
Merged
38 changes: 28 additions & 10 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,33 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(sys.env.getOrElse("ENABLE_COMET", "true").toBoolean)

val COMET_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
val COMET_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.enabled")
.doc(
"Whether to enable Comet scan. When this is turned on, Spark will use Comet to read " +
"Parquet data source. Note that to enable native vectorized execution, both this " +
"config and 'spark.comet.exec.enabled' need to be enabled. By default, this config " +
"is true.")
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively). Note " +
"that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled. By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.parquet.enabled")
.doc(
"When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " +
"Arrow format. Note that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(false)

val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.json.enabled")
.doc(
"When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " +
"Arrow format. Note that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.enabled")
.doc(
"Whether to enable Comet native vectorized execution for Spark. This controls whether " +
Expand Down Expand Up @@ -441,20 +459,20 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)

val COMET_SPARK_TO_COLUMNAR_ENABLED: ConfigEntry[Boolean] =
val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.sparkToColumnar.enabled")
Comment on lines +462 to 463
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you prefer that name/should we update the config to sparkToArrow while we're making these updates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong feeling whether we should change the config key or not. I think the description sufficiently explains what this does and I didn't want to cause extra work for people who are already using these configs.

If others think we should change it then I am fine with that too.

.internal()
.doc("Whether to enable Spark to Comet columnar conversion. When this is turned on, " +
.doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " +
"Comet will convert operators in " +
"`spark.comet.sparkToColumnar.supportedOperatorList` into Comet columnar based before " +
"`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " +
"processing.")
.booleanConf
.createWithDefault(false)

val COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.sparkToColumnar.supportedOperatorList")
.doc(
"A comma-separated list of operators that will be converted to Comet columnar " +
"A comma-separated list of operators that will be converted to Arrow columnar " +
"format when 'spark.comet.sparkToColumnar.enabled' is true")
.stringConf
.toSequence
Expand Down
5 changes: 3 additions & 2 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ as a native runtime to achieve improvement in terms of query efficiency and quer

Comet Overview <user-guide/overview>
Installing Comet <user-guide/installation>
Supported Expressions <user-guide/expressions>
Supported Operators <user-guide/operators>
Supported Data Sources <user-guide/datasources>
Supported Data Types <user-guide/datatypes>
Supported Operators <user-guide/operators>
Supported Expressions <user-guide/expressions>
Configuration Settings <user-guide/configs>
Compatibility Guide <user-guide/compatibility>
Tuning Guide <user-guide/tuning>
Expand Down
6 changes: 4 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Comet provides the following configuration settings.
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
| spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be allocated per executor process for Comet shuffle. Comet memory size is specified by `spark.comet.memoryOverhead` or calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default, this config is 1.0. | 1.0 |
| spark.comet.convert.json.enabled | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.convert.parquet.enabled | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
| spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
Expand Down Expand Up @@ -64,8 +66,8 @@ Comet provides the following configuration settings.
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
32 changes: 32 additions & 0 deletions docs/source/user-guide/datasources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Supported Spark Data Sources

## Parquet

When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types
in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case,
enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native
execution to happen after that, but the process may not be efficient.

## JSON

Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
converted into Arrow format, allowing native execution to happen after that.
27 changes: 13 additions & 14 deletions docs/source/user-guide/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
The following Spark operators are currently replaced with native versions. Query stages that contain any operators
not supported by Comet will fall back to regular Spark execution.

| Operator | Notes |
| -------------------------------------------- | ----- |
| FileSourceScanExec/BatchScanExec for Parquet | |
| Projection | |
| Filter | |
| Sort | |
| Hash Aggregate | |
| Limit | |
| Sort-merge Join | |
| Hash Join | |
| BroadcastHashJoinExec | |
| Shuffle | |
| Expand | |
| Union | |
| Operator | Notes |
| --------------------- | ----- |
| Projection | |
| Filter | |
| Sort | |
| Hash Aggregate | |
| Limit | |
| Sort-merge Join | |
| Hash Join | |
| BroadcastHashJoinExec | |
| Shuffle | |
| Expand | |
| Union | |
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
Expand Down Expand Up @@ -1095,7 +1097,7 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
COMET_NATIVE_SCAN_ENABLED.get(conf)
}

private[comet] def isCometExecEnabled(conf: SQLConf): Boolean = {
Expand Down Expand Up @@ -1131,12 +1133,39 @@ object CometSparkSessionExtensions extends Logging {
// operators can have a chance to be converted to columnar. Leaf operators that output
// columnar batches, such as Spark's vectorized readers, will also be converted to native
// comet batches.
// TODO: consider converting other intermediate operators to columnar.
op.isInstanceOf[LeafExecNode] && CometSparkToColumnarExec.isSchemaSupported(op.schema) &&
COMET_SPARK_TO_COLUMNAR_ENABLED.get(conf) && {
if (CometSparkToColumnarExec.isSchemaSupported(op.schema)) {
op match {
// Convert Spark DS v1 scan to Arrow format
case scan: FileSourceScanExec =>
scan.relation.fileFormat match {
case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parthchandra This is what we discussed.

case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(conf, op)
}
// Convert Spark DS v2 scan to Arrow format
case scan: BatchScanExec =>
scan.scan match {
case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(conf, op)
}
Comment on lines +1146 to +1151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended the new options that precedence over the old operator list in COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST? If someone already enabled all data source scans using that this change will disable it for parquet and json.

Maybe it would be surprising if the logic was

  private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
    // Only consider converting leaf nodes to columnar currently, so that all the following
    // operators can have a chance to be converted to columnar. Leaf operators that output
    // columnar batches, such as Spark's vectorized readers, will also be converted to native
    // comet batches.
    CometSparkToColumnarExec.isSchemaSupported(op.schema) && (
      isSparkToArrowEnabledOp(conf, op) || isSparkToArrowEnabledDataSource(conf, op))
  }

  private def isSparkToArrowEnabledDataSource(conf: SQLConf, op: SparkPlan): Boolean = {
    op match {
      // Convert Spark DS v1 scan to Arrow format
      case scan: FileSourceScanExec =>
        scan.relation.fileFormat match {
          case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
          case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
          case _ => false
        }
      // Convert Spark DS v2 scan to Arrow format
      case scan: BatchScanExec =>
        scan.scan match {
          case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
          case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
          case _ => false
        }
    }
  }

  private def isSparkToArrowEnabledOp(conf: SQLConf, op: SparkPlan) = {
    op.isInstanceOf[LeafExecNode] && COMET_SPARK_TO_ARROW_ENABLED.get(conf) && {
      val simpleClassName = Utils.getSimpleName(op.getClass)
      val nodeName = simpleClassName.replaceAll("Exec$", "")
      COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName)
    }
  }

So that we do not change the behavior of existing configs.

// other leaf nodes
case _: LeafExecNode =>
isSparkToArrowEnabled(conf, op)
case _ =>
// TODO: consider converting other intermediate operators to columnar.
false
}
} else {
false
}
}

private def isSparkToArrowEnabled(conf: SQLConf, op: SparkPlan) = {
COMET_SPARK_TO_ARROW_ENABLED.get(conf) && {
val simpleClassName = Utils.getSimpleName(op.getClass)
val nodeName = simpleClassName.replaceAll("Exec$", "")
COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName)
COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName)
}
}

Expand Down
4 changes: 4 additions & 0 deletions spark/src/test/resources/test-data/json-test-1.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{ "a": [1], "b": { "c": "foo" }}
{ "a": 2, "b": { "d": "bar" }}
{ "b": { "d": "bar" }}
{ "a": 3 }
45 changes: 24 additions & 21 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1974,27 +1974,30 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field") {
withSQLConf(
CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key -> "true",
CometConf.COMET_SPARK_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST.key -> "FileSourceScan") {
withTempPath { dir =>
var df = spark
.range(5)
// Add both a null struct and null inner value
.select(
when(
col("id") > 1,
struct(
when(col("id") > 2, col("id")).alias("id"),
when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id")))
.as("nested2")))
.alias("nested1"))

df.write.parquet(dir.toString())

df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
Seq("", "parquet").foreach { v1List =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this line so that we test with both v1 and v2 sources

withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") {
withTempPath { dir =>
var df = spark
.range(5)
// Add both a null struct and null inner value
.select(
when(
col("id") > 1,
struct(
when(col("id") > 2, col("id")).alias("id"),
when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id")))
.as("nested2")))
.alias("nested1"))

df.write.parquet(dir.toString())

df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE
CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")
val shuffled = df
Expand Down
14 changes: 14 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,20 @@ class CometExecSuite extends CometTestBase {
}
})
}

test("read JSON file") {
Seq("", "json").foreach { v1List =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true",
CometConf.COMET_CONVERT_FROM_JSON_ENABLED.key -> "true") {
spark.read
.json("src/test/resources/test-data/json-test-1.ndjson")
.createOrReplaceTempView("tbl")
checkSparkAnswerAndOperator("SELECT a, b.c, b.d FROM tbl")
}
}
}
}

case class BucketedTableTestSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_COLUMNAR_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf
}
Expand Down
Loading