Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com

COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
// TODO add support for native_datafusion in the future
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
nativeDataFusionScan(session, scanExec, r, hadoopConf)
.orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf))
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClas
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
import org.apache.comet.shims.ShimFileFormat

/**
* Validation and serde logic for `native_datafusion` scans.
Expand Down Expand Up @@ -77,6 +78,22 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled")
}

if (scanExec.fileConstantMetadataColumns.nonEmpty) {
withInfo(scanExec, "Native DataFusion scan does not support metadata columns")
}

if (CometParquetUtils.readFieldId(SQLConf.get)) {
withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID based reads")
}

if (scanExec.bucketedScan) {
withInfo(scanExec, "Native DataFusion scan does not support bucketed scans")
}

if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
}

// the scan is supported if no fallback reasons were added to the node
!hasExplainInfo(scanExec)
}
Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
private val timestampPattern = "0123456789/:T" + whitespaceChars

lazy val usingParquetExecWithIncompatTypes: Boolean =
usingDataSourceExecWithIncompatTypes(conf)
hasUnsignedSmallIntSafetyCheck(conf)

test("all valid cast combinations covered") {
val names = testNames
Expand Down Expand Up @@ -1087,7 +1087,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
|USING parquet
""".stripMargin)
sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
checkSparkAnswerAndOperator(
"SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("round") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!usingDataSourceExec)
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down Expand Up @@ -1573,7 +1573,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("hex") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!usingDataSourceExec)
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "hex.parquet")
Expand Down Expand Up @@ -2607,7 +2607,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field with DataFusion ParquetExec - read entire struct") {
assume(usingDataSourceExec(conf))
assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand Down Expand Up @@ -2644,7 +2644,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("read array[int] from parquet") {
assume(usingDataSourceExec(conf))
assume(!usingLegacyNativeCometScan(conf))

withTempPath { dir =>
// create input file with Comet disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT count(distinct $col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}

Expand All @@ -45,7 +45,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT count(distinct $col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -57,7 +57,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}

Expand All @@ -73,7 +73,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) {
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -87,7 +87,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns) {
val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 group by c1, c2, c3"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -99,7 +99,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns) {
val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -112,7 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
for (col <- df.columns.drop(1)) {
val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -126,7 +126,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " +
s"GROUP BY $groupCol ORDER BY $groupCol"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -138,7 +138,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
// cannot run fully native due to HashAggregate
val sql = s"SELECT min($col), max($col) FROM t1"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand Down
14 changes: 7 additions & 7 deletions spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1"
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
checkSparkAnswerAndOperator(sql)
} else {
checkSparkAnswer(sql)
Expand All @@ -59,7 +59,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
val sql = "SELECT * FROM t1 LIMIT 500"
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
checkSparkAnswerAndOperator(sql)
} else {
checkSparkAnswer(sql)
Expand Down Expand Up @@ -112,7 +112,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
s"alter table t2 add column col2 $defaultValueType default $defaultValueString")
// Verify that our default value matches Spark's answer
val sql = "select col2 from t2"
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
checkSparkAnswerAndOperator(sql)
} else {
checkSparkAnswer(sql)
Expand All @@ -139,7 +139,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val sql = s"SELECT $col FROM t1 ORDER BY $col"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand All @@ -152,7 +152,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
// cannot run fully natively due to range partitioning and sort
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(1 == collectNativeScans(cometPlan).length)
}
}
Expand Down Expand Up @@ -207,7 +207,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
val df = spark.read.parquet(filename)
val df2 = df.repartition(8, df.col("c0")).sort("c1")
df2.collect()
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan)
val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match {
case CometConf.SCAN_NATIVE_COMET =>
Expand All @@ -233,7 +233,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
// cannot run fully native due to HashAggregate
val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
val (_, cometPlan) = checkSparkAnswer(sql)
if (usingDataSourceExec) {
if (!usingLegacyNativeCometScan) {
assert(2 == collectNativeScans(cometPlan).length)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti
class CometMapExpressionSuite extends CometTestBase {

test("read map[int, int] from parquet") {
assume(usingDataSourceExec(conf))
assume(!usingLegacyNativeCometScan(conf))

withTempPath { dir =>
// create input file with Comet disabled
Expand Down Expand Up @@ -63,7 +63,7 @@ class CometMapExpressionSuite extends CometTestBase {

// repro for https://github.com/apache/datafusion-comet/issues/1754
test("read map[struct, struct] from parquet") {
assume(usingDataSourceExec(conf))
assume(!usingLegacyNativeCometScan(conf))

withTempPath { dir =>
// create input file with Comet disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
// TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040
// and https://github.com/apache/arrow-rs/issues/7097
val fieldsToTest =
if (usingDataSourceExec(conf)) {
if (!usingLegacyNativeCometScan(conf)) {
Seq(
$"_1",
$"_4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase {

test("HashJoin struct key") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!usingDataSourceExec)
assume(usingLegacyNativeCometScan)
withSQLConf(
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ abstract class ParquetReadSuite extends CometTestBase {
// for native iceberg compat, CometScanExec supports some types that native_comet does not.
// note that native_datafusion does not use CometScanExec so we need not include that in
// the check
val isDataFusionScan = usingDataSourceExec(conf)
val isDataFusionScan = !usingLegacyNativeCometScan(conf)
Seq(
NullType -> false,
BooleanType -> true,
Expand Down Expand Up @@ -143,7 +143,7 @@ abstract class ParquetReadSuite extends CometTestBase {

// Arrays support for iceberg compat native and for Parquet V1
val cometScanExecSupported =
if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite])
if (!usingLegacyNativeCometScan(conf) && this.isInstanceOf[ParquetReadV1Suite])
Seq(true, true, true)
else Seq(true, false, false)

Expand Down Expand Up @@ -185,7 +185,7 @@ abstract class ParquetReadSuite extends CometTestBase {
i.toDouble,
DateTimeUtils.toJavaDate(i))
}
if (!usingDataSourceExecWithIncompatTypes(conf)) {
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
checkParquetScan(data)
}
checkParquetFile(data)
Expand All @@ -207,7 +207,7 @@ abstract class ParquetReadSuite extends CometTestBase {
i.toDouble,
DateTimeUtils.toJavaDate(i))
}
if (!usingDataSourceExecWithIncompatTypes(conf)) {
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
checkParquetScan(data)
}
checkParquetFile(data)
Expand All @@ -228,7 +228,7 @@ abstract class ParquetReadSuite extends CometTestBase {
DateTimeUtils.toJavaDate(i))
}
val filter = (row: Row) => row.getBoolean(0)
if (!usingDataSourceExecWithIncompatTypes(conf)) {
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
checkParquetScan(data, filter)
}
checkParquetFile(data, filter)
Expand Down Expand Up @@ -1249,8 +1249,7 @@ abstract class ParquetReadSuite extends CometTestBase {

withParquetDataFrame(data, schema = Some(readSchema)) { df =>
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL
.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) {
if (enableSchemaEvolution || !usingLegacyNativeCometScan(conf)) {
checkAnswer(df, data.map(Row.fromTuple))
} else {
assertThrows[SparkException](df.collect())
Expand Down Expand Up @@ -1515,7 +1514,7 @@ abstract class ParquetReadSuite extends CometTestBase {
test("row group skipping doesn't overflow when reading into larger type") {
// Spark 4.0 no longer fails for widening types SPARK-40876
// https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
assume(!isSpark40Plus && !usingDataSourceExec(conf))
assume(!isSpark40Plus && usingLegacyNativeCometScan(conf))
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
Expand Down
13 changes: 6 additions & 7 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ abstract class CometTestBase
}

def getPrimitiveTypesParquetSchema: String = {
if (usingDataSourceExecWithIncompatTypes(conf)) {
if (hasUnsignedSmallIntSafetyCheck(conf)) {
// Comet complex type reader has different behavior for uint_8, uint_16 types.
// The issue stems from undefined behavior in the parquet spec and is tracked
// here: https://github.com/apache/parquet-java/issues/3142
Expand Down Expand Up @@ -1268,14 +1268,13 @@ abstract class CometTestBase
writer.close()
}

def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get)
def usingLegacyNativeCometScan: Boolean = usingLegacyNativeCometScan(SQLConf.get)

def usingDataSourceExec(conf: SQLConf): Boolean =
Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION).contains(
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
def usingLegacyNativeCometScan(conf: SQLConf): Boolean =
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET

def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
usingDataSourceExec(conf) &&
def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = {
!usingLegacyNativeCometScan(conf) &&
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
}
}
Loading
Loading