Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.schemaValidation.enabled")
.category(CATEGORY_PARQUET)
.doc(
"Whether to enable Spark-compatible schema validation when reading Parquet files " +
"with native_datafusion scan. When enabled, type coercions and column resolutions " +
"that Spark's vectorized reader would reject will also be rejected by Comet, " +
"throwing SparkException with compatible error messages.")
.booleanConf
.createWithDefault(true)

val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
.category(CATEGORY_PARQUET)
Expand Down
56 changes: 18 additions & 38 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ index c26757c9cff..d55775f09d7 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3cf2bfd17ab..49728c35c42 100644
index 3cf2bfd17ab..78e679a2870 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -936,7 +936,17 @@ index 3cf2bfd17ab..49728c35c42 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4459,7 +4460,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -3127,7 +3128,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key ->
@@ -4459,7 +4461,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" +
Expand All @@ -946,7 +956,7 @@ index 3cf2bfd17ab..49728c35c42 100644
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4480,7 +4482,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4480,7 +4483,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39175: Query context of Cast should be serialized to executors" +
Expand All @@ -956,7 +966,7 @@ index 3cf2bfd17ab..49728c35c42 100644
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4497,14 +4500,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -4497,14 +4501,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val msg = intercept[SparkException] {
sql(query).collect()
}.getMessage
Expand Down Expand Up @@ -2212,30 +2222,10 @@ index 8e88049f51e..49f2001dc6b 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 8ed9ef1630e..f312174b182 100644
index 8ed9ef1630e..eed2a6f5ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))

@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))

@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand All @@ -2246,7 +2236,7 @@ index 8ed9ef1630e..f312174b182 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index f6472ba3d9d..ce39ebb52e6 100644
index f6472ba3d9d..6ea893cdbe6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Expand Down Expand Up @@ -2299,17 +2289,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

- test("row group skipping doesn't overflow when reading into larger type") {
+ test("row group skipping doesn't overflow when reading into larger type",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1148,7 +1153,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
.where(s"a < ${Long.MaxValue}")
.collect()
}
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,8 @@ impl PhysicalPlanner {
default_values,
scan.session_timezone.as_str(),
scan.case_sensitive,
scan.schema_validation_enabled,
scan.schema_evolution_enabled,
self.session_ctx(),
scan.encryption_enabled,
)?;
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/spark_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
#[allow(dead_code)]
pub(crate) const COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: &str =
"spark.comet.parquet.schemaValidation.enabled";

pub(crate) trait SparkConfig {
fn get_bool(&self, name: &str) -> bool;
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
None,
session_timezone.as_str(),
case_sensitive != JNI_FALSE,
false, // schema_validation_enabled - validation is done on the Java side
false, // schema_evolution_enabled
session_ctx,
encryption_enabled,
)?;
Expand Down
8 changes: 8 additions & 0 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ pub(crate) fn init_datasource_exec(
default_values: Option<HashMap<usize, ScalarValue>>,
session_timezone: &str,
case_sensitive: bool,
schema_validation_enabled: bool,
schema_evolution_enabled: bool,
session_ctx: &Arc<SessionContext>,
encryption_enabled: bool,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let (table_parquet_options, spark_parquet_options) = get_options(
session_timezone,
case_sensitive,
schema_validation_enabled,
schema_evolution_enabled,
&object_store_url,
encryption_enabled,
);
Expand Down Expand Up @@ -142,6 +146,8 @@ pub(crate) fn init_datasource_exec(
fn get_options(
session_timezone: &str,
case_sensitive: bool,
schema_validation_enabled: bool,
schema_evolution_enabled: bool,
object_store_url: &ObjectStoreUrl,
encryption_enabled: bool,
) -> (TableParquetOptions, SparkParquetOptions) {
Expand All @@ -153,6 +159,8 @@ fn get_options(
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
spark_parquet_options.allow_cast_unsigned_ints = true;
spark_parquet_options.case_sensitive = case_sensitive;
spark_parquet_options.schema_validation_enabled = schema_validation_enabled;
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;

if encryption_enabled {
table_parquet_options.crypto.configure_factory(
Expand Down
8 changes: 8 additions & 0 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub struct SparkParquetOptions {
pub use_legacy_date_timestamp_or_ntz: bool,
// Whether schema field names are case sensitive
pub case_sensitive: bool,
/// Whether to validate schema compatibility (type coercions) in a Spark-compatible way
pub schema_validation_enabled: bool,
/// Whether schema evolution (type widening) is enabled
pub schema_evolution_enabled: bool,
}

impl SparkParquetOptions {
Expand All @@ -88,6 +92,8 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
schema_validation_enabled: true,
schema_evolution_enabled: false,
}
}

Expand All @@ -100,6 +106,8 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
schema_validation_enabled: true,
schema_evolution_enabled: false,
}
}
}
Expand Down
Loading
Loading