-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add specific configs for converting Spark Parquet and JSON data to Arrow #832
Conversation
@Kimahriman @eejbyfeldt Could you review please? This extends your earlier work to make it more accessible to users and allows us to support native execution on JSON data sources. I'll create a separate PR for CSV support. |
df = spark.read.parquet(dir.toString()) | ||
checkSparkAnswerAndOperator(df.select("nested1.id")) | ||
checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) | ||
Seq("", "parquet").foreach { v1List => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this line so that we test with both v1 and v2 sources
Hmmm do you need separate configs per type? Or should there just be a dedicated like "convert from non-supported scan" config that covers all input formats? |
That would probably be OK, but there is the potential case where users have multiple file types and there is a bug or performance issue with one specific type, so it would be beneficial to just disable that type. Also, at some point we will likely want to enable individual formats by default once they are well tested and performant. |
The other alternative in that case would be similar to the useV1SourceList, have a single config with a list of formats to do it for, but don't have a strong opinion either way |
val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] = | ||
conf("spark.comet.convert.parquet.enabled") | ||
.doc( | ||
"When enabled, data from Parquet v1 and v2 scans will be converted to Arrow format. Note " + | ||
"that to enable native vectorized execution, both this config and " + | ||
"'spark.comet.exec.enabled' need to be enabled.") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be confusing to COMET_NATIVE_SCAN_ENABLED
. We probably can mention in the doc that this doesn't use Comet native scan but Spark parquet reader and convert it to Comet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this in the docs:
## Parquet
When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types
in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case,
enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native
execution to happen after that, but the process may not be efficient.
I'll take another pass at the config description though to make it more detailed
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
Outdated
Show resolved
Hide resolved
// v1 scan | ||
case scan: FileSourceScanExec => | ||
scan.relation.fileFormat match { | ||
case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@parthchandra This is what we discussed.
val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = | ||
conf("spark.comet.sparkToColumnar.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you prefer that name/should we update the config to sparkToArrow while we're making these updates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong feeling whether we should change the config key or not. I think the description sufficiently explains what this does and I didn't want to cause extra work for people who are already using these configs.
If others think we should change it then I am fine with that too.
…ons.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
…ons.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sry about the slow review. Did not get time until today. Only have one minor comment on how the new configuration might break existing way of enabling it.
case scan: BatchScanExec => | ||
scan.scan match { | ||
case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf) | ||
case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf) | ||
case _ => isSparkToArrowEnabled(conf, op) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended the new options that precedence over the old operator list in COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST
? If someone already enabled all data source scans using that this change will disable it for parquet and json.
Maybe it would be surprising if the logic was
private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
// Only consider converting leaf nodes to columnar currently, so that all the following
// operators can have a chance to be converted to columnar. Leaf operators that output
// columnar batches, such as Spark's vectorized readers, will also be converted to native
// comet batches.
CometSparkToColumnarExec.isSchemaSupported(op.schema) && (
isSparkToArrowEnabledOp(conf, op) || isSparkToArrowEnabledDataSource(conf, op))
}
private def isSparkToArrowEnabledDataSource(conf: SQLConf, op: SparkPlan): Boolean = {
op match {
// Convert Spark DS v1 scan to Arrow format
case scan: FileSourceScanExec =>
scan.relation.fileFormat match {
case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => false
}
// Convert Spark DS v2 scan to Arrow format
case scan: BatchScanExec =>
scan.scan match {
case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => false
}
}
}
private def isSparkToArrowEnabledOp(conf: SQLConf, op: SparkPlan) = {
op.isInstanceOf[LeafExecNode] && COMET_SPARK_TO_ARROW_ENABLED.get(conf) && {
val simpleClassName = Utils.getSimpleName(op.getClass)
val nodeName = simpleClassName.replaceAll("Exec$", "")
COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.get(conf).contains(nodeName)
}
}
So that we do not change the behavior of existing configs.
… to Arrow (apache#832) * Add option to enable JSON scan * support v1 and v2 data sources * formatting * format * Add schema check * fix regression * improve configs and docs * fix * improve test and fix typo in doc * renaming some variables but no change to public config names * improve test * improve test * Update spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> * Update spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> * improve config description * fix path --------- Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Which issue does this PR close?
N/A
Rationale for this change
We already supported reading JSON but the user would need to know to add
FileSourceScan
and/orBatchScan
to the existingspark.comet.sparkToColumnar.supportedOperatorList
configuration setting, and this is not documented. We also did not have any tests for this use case.This PR adds a specific
spark.comet.scan.json.enabled
config.What changes are included in this PR?
How are these changes tested?