Skip to content

Commit

Permalink
feat: Only allow incompatible cast expressions to run in comet if a c…
Browse files Browse the repository at this point in the history
…onfig is enabled (#362)
  • Loading branch information
andygrove authored May 3, 2024
1 parent 5fc6327 commit 2741ae7
Show file tree
Hide file tree
Showing 13 changed files with 663 additions and 181 deletions.
49 changes: 8 additions & 41 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.comet

import java.io.{BufferedOutputStream, FileOutputStream}
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ListBuffer
import scala.io.Source

import org.apache.spark.network.util.ByteUnit
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -376,12 +374,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
"spark.comet.cast.stringToTimestamp")
.doc(
"Comet is not currently fully compatible with Spark when casting from String to Timestamp.")
.booleanConf
.createWithDefault(false)
val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all cast operations. " +
"Set this config to true to allow them anyway. See compatibility guide " +
"for more information.")
.booleanConf
.createWithDefault(false)

}

Expand Down Expand Up @@ -625,36 +625,3 @@ private[comet] case class ConfigBuilder(key: String) {
private object ConfigEntry {
val UNDEFINED = "<undefined>"
}

/**
* Utility for generating markdown documentation from the configs.
*
* This is invoked when running `mvn clean package -DskipTests`.
*/
object CometConfGenerateDocs {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
// scalastyle:off println
println("Missing arguments for template file and output file")
// scalastyle:on println
sys.exit(-1)
}
val templateFilename = args.head
val outputFilename = args(1)
val w = new BufferedOutputStream(new FileOutputStream(outputFilename))
for (line <- Source.fromFile(templateFilename).getLines()) {
if (line.trim == "<!--CONFIG_TABLE-->") {
val publicConfigs = CometConf.allConfs.filter(_.isPublic)
val confs = publicConfigs.sortBy(_.key)
w.write("| Config | Description | Default Value |\n".getBytes)
w.write("|--------|-------------|---------------|\n".getBytes)
for (conf <- confs) {
w.write(s"| ${conf.key} | ${conf.doc.trim} | ${conf.defaultValueString} |\n".getBytes)
}
} else {
w.write(s"${line.trim}\n".getBytes)
}
}
w.close()
}
}
50 changes: 50 additions & 0 deletions docs/source/user-guide/compatibility-template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Compatibility Guide

Comet aims to provide consistent results with the version of Apache Spark that is being used.

This guide offers information about areas of functionality where there are known differences.

## ANSI mode

Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default,
Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled,
specify `spark.comet.ansi.enabled=true` in the Spark configuration. Comet's ANSI support is experimental and should not
be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Cast

Cast operations in Comet fall into three levels of support:

- **Compatible**: The results match Apache Spark
- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs
will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting
`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not
recommended for production use.
- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to
Spark.

The following table shows the current cast operations supported by Comet. Any cast that does not appear in this
table (such as those involving complex types and timestamp_ntz, for example) are not supported by Comet.

<!--CAST_TABLE-->
159 changes: 136 additions & 23 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Compatibility Guide
Expand All @@ -34,13 +34,126 @@ There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where

## Cast

Comet currently delegates to Apache DataFusion for most cast operations, and this means that the behavior is not
guaranteed to be consistent with Spark.
Cast operations in Comet fall into three levels of support:

There is an [epic](https://github.com/apache/datafusion-comet/issues/286) where we are tracking the work to implement Spark-compatible cast expressions.
- **Compatible**: The results match Apache Spark
- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs
will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting
`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not
recommended for production use.
- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to
Spark.

### Cast from String to Timestamp
The following table shows the current cast operations supported by Comet. Any cast that does not appear in this
table (such as those involving complex types and timestamp_ntz, for example) are not supported by Comet.

Casting from String to Timestamp is disabled by default due to incompatibilities with Spark, including timezone
issues, and can be enabled by setting `spark.comet.castStringToTimestamp=true`. See the
[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for more information.
| From Type | To Type | Compatible? | Notes |
| --------- | --------- | ------------ | ----------------------------------- |
| boolean | byte | Compatible | |
| boolean | short | Compatible | |
| boolean | integer | Compatible | |
| boolean | long | Compatible | |
| boolean | float | Compatible | |
| boolean | double | Compatible | |
| boolean | decimal | Unsupported | |
| boolean | string | Compatible | |
| boolean | timestamp | Unsupported | |
| byte | boolean | Compatible | |
| byte | short | Compatible | |
| byte | integer | Compatible | |
| byte | long | Compatible | |
| byte | float | Compatible | |
| byte | double | Compatible | |
| byte | decimal | Compatible | |
| byte | string | Compatible | |
| byte | binary | Unsupported | |
| byte | timestamp | Unsupported | |
| short | boolean | Compatible | |
| short | byte | Compatible | |
| short | integer | Compatible | |
| short | long | Compatible | |
| short | float | Compatible | |
| short | double | Compatible | |
| short | decimal | Compatible | |
| short | string | Compatible | |
| short | binary | Unsupported | |
| short | timestamp | Unsupported | |
| integer | boolean | Compatible | |
| integer | byte | Compatible | |
| integer | short | Compatible | |
| integer | long | Compatible | |
| integer | float | Compatible | |
| integer | double | Compatible | |
| integer | decimal | Compatible | |
| integer | string | Compatible | |
| integer | binary | Unsupported | |
| integer | timestamp | Unsupported | |
| long | boolean | Compatible | |
| long | byte | Compatible | |
| long | short | Compatible | |
| long | integer | Compatible | |
| long | float | Compatible | |
| long | double | Compatible | |
| long | decimal | Compatible | |
| long | string | Compatible | |
| long | binary | Unsupported | |
| long | timestamp | Unsupported | |
| float | boolean | Compatible | |
| float | byte | Unsupported | |
| float | short | Unsupported | |
| float | integer | Unsupported | |
| float | long | Unsupported | |
| float | double | Compatible | |
| float | decimal | Unsupported | |
| float | string | Incompatible | |
| float | timestamp | Unsupported | |
| double | boolean | Compatible | |
| double | byte | Unsupported | |
| double | short | Unsupported | |
| double | integer | Unsupported | |
| double | long | Unsupported | |
| double | float | Compatible | |
| double | decimal | Incompatible | |
| double | string | Incompatible | |
| double | timestamp | Unsupported | |
| decimal | boolean | Unsupported | |
| decimal | byte | Unsupported | |
| decimal | short | Unsupported | |
| decimal | integer | Unsupported | |
| decimal | long | Unsupported | |
| decimal | float | Compatible | |
| decimal | double | Compatible | |
| decimal | string | Unsupported | |
| decimal | timestamp | Unsupported | |
| string | boolean | Compatible | |
| string | byte | Compatible | |
| string | short | Compatible | |
| string | integer | Compatible | |
| string | long | Compatible | |
| string | float | Unsupported | |
| string | double | Unsupported | |
| string | decimal | Unsupported | |
| string | binary | Compatible | |
| string | date | Unsupported | |
| string | timestamp | Incompatible | Not all valid formats are supported |
| binary | string | Incompatible | |
| date | boolean | Unsupported | |
| date | byte | Unsupported | |
| date | short | Unsupported | |
| date | integer | Unsupported | |
| date | long | Unsupported | |
| date | float | Unsupported | |
| date | double | Unsupported | |
| date | decimal | Unsupported | |
| date | string | Compatible | |
| date | timestamp | Unsupported | |
| timestamp | boolean | Unsupported | |
| timestamp | byte | Unsupported | |
| timestamp | short | Unsupported | |
| timestamp | integer | Unsupported | |
| timestamp | long | Compatible | |
| timestamp | float | Unsupported | |
| timestamp | double | Unsupported | |
| timestamp | decimal | Unsupported | |
| timestamp | string | Compatible | |
| timestamp | date | Compatible | |
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Comet provides the following configuration settings.
|--------|-------------|---------------|
| spark.comet.ansi.enabled | Comet does not respect ANSI mode in most cases and by default will not accelerate queries when ansi mode is enabled. Enable this setting to test Comet's experimental support for ANSI mode. This should not be used in production. | false |
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.cast.stringToTimestamp | Comet is not currently fully compatible with Spark when casting from String to Timestamp. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
Expand Down
13 changes: 7 additions & 6 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ under the License.
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down Expand Up @@ -270,17 +275,13 @@ under the License.
<version>3.2.0</version>
<executions>
<execution>
<id>generate-config-docs</id>
<id>generate-user-guide-reference-docs</id>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>org.apache.comet.CometConfGenerateDocs</mainClass>
<arguments>
<argument>docs/source/user-guide/configs-template.md</argument>
<argument>docs/source/user-guide/configs.md</argument>
</arguments>
<mainClass>org.apache.comet.GenerateDocs</mainClass>
<classpathScope>compile</classpathScope>
</configuration>
</execution>
Expand Down
Loading

0 comments on commit 2741ae7

Please sign in to comment.