diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index c5b6554054..7642749ad8 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -333,32 +333,6 @@ object IcebergReflection extends Logging { } } - /** - * Gets delete files from scan tasks. - * - * @param tasks - * List of Iceberg FileScanTask objects - * @return - * List of all delete files across all tasks - * @throws Exception - * if reflection fails (callers must handle appropriately based on context) - */ - def getDeleteFiles(tasks: java.util.List[_]): java.util.List[_] = { - import scala.jdk.CollectionConverters._ - val allDeletes = new java.util.ArrayList[Any]() - - // scalastyle:off classforname - val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK) - // scalastyle:on classforname - - tasks.asScala.foreach { task => - val deletes = getDeleteFilesFromTask(task, fileScanTaskClass) - allDeletes.addAll(deletes) - } - - allDeletes - } - /** * Gets delete files from a single FileScanTask. * @@ -495,91 +469,6 @@ object IcebergReflection extends Logging { } } - /** - * Validates file formats and filesystem schemes for Iceberg tasks. - * - * Checks that all data files and delete files are Parquet format and use filesystem schemes - * supported by iceberg-rust (file, s3, s3a, gs, gcs, oss, abfss, abfs, wasbs, wasb). - * - * @param tasks - * List of Iceberg FileScanTask objects - * @return - * (allParquet, unsupportedSchemes) where: - allParquet: true if all files are Parquet format - * \- unsupportedSchemes: Set of unsupported filesystem schemes found (empty if all supported) - */ - def validateFileFormatsAndSchemes(tasks: java.util.List[_]): (Boolean, Set[String]) = { - import scala.jdk.CollectionConverters._ - - // scalastyle:off classforname - val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK) - val contentFileClass = Class.forName(ClassNames.CONTENT_FILE) - // scalastyle:on classforname - - val fileMethod = contentScanTaskClass.getMethod("file") - val formatMethod = contentFileClass.getMethod("format") - val pathMethod = contentFileClass.getMethod("path") - - // Filesystem schemes supported by iceberg-rust - // See: iceberg-rust/crates/iceberg/src/io/storage.rs parse_scheme() - val supportedSchemes = - Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb") - - var allParquet = true - val unsupportedSchemes = scala.collection.mutable.Set[String]() - - tasks.asScala.foreach { task => - val dataFile = fileMethod.invoke(task) - val fileFormat = formatMethod.invoke(dataFile).toString - - // Check file format - if (fileFormat != FileFormats.PARQUET) { - allParquet = false - } else { - // Only check filesystem schemes for Parquet files we'll actually process - try { - val filePath = pathMethod.invoke(dataFile).toString - val uri = new java.net.URI(filePath) - val scheme = uri.getScheme - - if (scheme != null && !supportedSchemes.contains(scheme)) { - unsupportedSchemes += scheme - } - } catch { - case _: java.net.URISyntaxException => - // Ignore URI parsing errors - file paths may contain special characters - // If the path is invalid, we'll fail later during actual file access - } - - // Check delete files if they exist - try { - val deletesMethod = task.getClass.getMethod("deletes") - val deleteFiles = deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] - - deleteFiles.asScala.foreach { deleteFile => - extractFileLocation(contentFileClass, deleteFile).foreach { deletePath => - try { - val deleteUri = new java.net.URI(deletePath) - val deleteScheme = deleteUri.getScheme - - if (deleteScheme != null && !supportedSchemes.contains(deleteScheme)) { - unsupportedSchemes += deleteScheme - } - } catch { - case _: java.net.URISyntaxException => - // Ignore URI parsing errors for delete files too - } - } - } - } catch { - case _: Exception => - // Ignore errors accessing delete files - they may not be supported - } - } - } - - (allParquet, unsupportedSchemes.toSet) - } - /** * Validates partition column types for compatibility with iceberg-rust. * @@ -643,68 +532,6 @@ object IcebergReflection extends Logging { unsupportedTypes.toList } - - /** - * Checks if tasks have non-identity transforms in their residual expressions. - * - * Residual expressions are filters that must be evaluated after reading data from Parquet. - * iceberg-rust can only handle simple column references in residuals, not transformed columns. - * Transform functions like truncate, bucket, year, month, day, hour require evaluation by - * Spark. - * - * @param tasks - * List of Iceberg FileScanTask objects - * @return - * Some(transformType) if an unsupported transform is found (e.g., "truncate[4]"), None if all - * transforms are identity or no transforms are present - * @throws Exception - * if reflection fails - caller must handle appropriately (fallback in planning, fatal in - * serialization) - */ - def findNonIdentityTransformInResiduals(tasks: java.util.List[_]): Option[String] = { - import scala.jdk.CollectionConverters._ - - // scalastyle:off classforname - val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK) - val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK) - val unboundPredicateClass = Class.forName(ClassNames.UNBOUND_PREDICATE) - // scalastyle:on classforname - - tasks.asScala.foreach { task => - if (fileScanTaskClass.isInstance(task)) { - try { - val residualMethod = contentScanTaskClass.getMethod("residual") - val residual = residualMethod.invoke(task) - - // Check if residual is an UnboundPredicate with a transform - if (unboundPredicateClass.isInstance(residual)) { - val termMethod = unboundPredicateClass.getMethod("term") - val term = termMethod.invoke(residual) - - // Check if term has a transform - try { - val transformMethod = term.getClass.getMethod("transform") - transformMethod.setAccessible(true) - val transform = transformMethod.invoke(term) - val transformStr = transform.toString - - // Only identity transform is supported in residuals - if (transformStr != Transforms.IDENTITY) { - return Some(transformStr) - } - } catch { - case _: NoSuchMethodException => - // No transform method means it's a simple reference - OK - } - } - } catch { - case _: Exception => - // Skip tasks where we can't get residual - they may not have one - } - } - } - None - } } /** @@ -783,10 +610,8 @@ object CometIcebergNativeScanMetadata extends Logging { val globalFieldIdMapping = buildFieldIdMapping(scanSchema) // File format is always PARQUET, - // validated in CometScanRule.validateFileFormatsAndSchemes() + // validated in CometScanRule.validateIcebergFileScanTasks() // Hardcoded here for extensibility (future ORC/Avro support would add logic here) - val fileFormat = FileFormats.PARQUET - CometIcebergNativeScanMetadata( table = table, metadataLocation = metadataLocation, @@ -796,7 +621,7 @@ object CometIcebergNativeScanMetadata extends Logging { tableSchema = tableSchema, globalFieldIdMapping = globalFieldIdMapping, catalogProperties = catalogProperties, - fileFormat = fileFormat) + fileFormat = FileFormats.PARQUET) } } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index ebb5217300..66f382f067 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -467,20 +467,28 @@ case class CometScanRule(session: SparkSession) false } - // Check if all files are Parquet format and use supported filesystem schemes - val (allParquetFiles, unsupportedSchemes) = - IcebergReflection.validateFileFormatsAndSchemes(metadata.tasks) + // Single-pass validation of all FileScanTasks + val taskValidation = + try { + CometScanRule.validateIcebergFileScanTasks(metadata.tasks) + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not validate " + + s"FileScanTasks: ${e.getMessage}" + return withInfos(scanExec, fallbackReasons.toSet) + } - val allSupportedFilesystems = if (unsupportedSchemes.isEmpty) { + // Check if all files are Parquet format and use supported filesystem schemes + val allSupportedFilesystems = if (taskValidation.unsupportedSchemes.isEmpty) { true } else { fallbackReasons += "Iceberg scan contains files with unsupported filesystem " + - s"schemes: ${unsupportedSchemes.mkString(", ")}. " + + s"schemes: ${taskValidation.unsupportedSchemes.mkString(", ")}. " + "Comet only supports: file, s3, s3a, gs, gcs, oss, abfss, abfs, wasbs, wasb" false } - if (!allParquetFiles) { + if (!taskValidation.allParquet) { fallbackReasons += "Iceberg scan contains non-Parquet files (ORC or Avro). " + "Comet only supports Parquet files in Iceberg tables" } @@ -568,37 +576,24 @@ case class CometScanRule(session: SparkSession) // Check for unsupported transform functions in residual expressions // iceberg-rust can only handle identity transforms in residuals; all other transforms // (truncate, bucket, year, month, day, hour) must fall back to Spark - val transformFunctionsSupported = - try { - IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { - case Some(transformType) => - // Found unsupported transform - fallbackReasons += - s"Iceberg transform function '$transformType' in residual expression " + - "is not yet supported by iceberg-rust. " + - "Only identity transforms are supported." - false - case None => - // No unsupported transforms found - safe to use native execution - true - } - } catch { - case e: Exception => - // Reflection failure - cannot verify safety, must fall back - fallbackReasons += "Iceberg reflection failure: Could not check for " + - s"transform functions in residuals: ${e.getMessage}" - false - } + val transformFunctionsSupported = taskValidation.nonIdentityTransform match { + case Some(transformType) => + fallbackReasons += + s"Iceberg transform function '$transformType' in residual expression " + + "is not yet supported by iceberg-rust. " + + "Only identity transforms are supported." + false + case None => + true + } // Check for unsupported struct types in delete files val deleteFileTypesSupported = { var hasUnsupportedDeletes = false try { - val deleteFiles = IcebergReflection.getDeleteFiles(metadata.tasks) - - if (!deleteFiles.isEmpty) { - deleteFiles.asScala.foreach { deleteFile => + if (!taskValidation.deleteFiles.isEmpty) { + taskValidation.deleteFiles.asScala.foreach { deleteFile => val equalityFieldIds = IcebergReflection.getEqualityFieldIds(deleteFile) if (!equalityFieldIds.isEmpty) { @@ -671,8 +666,8 @@ case class CometScanRule(session: SparkSession) } } - if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles && - allSupportedFilesystems && partitionTypesSupported && + if (schemaSupported && fileIOCompatible && formatVersionSupported && + taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported && complexTypePredicatesSupported && transformFunctionsSupported && deleteFileTypesSupported && dppSubqueriesSupported) { CometBatchScanExec( @@ -854,4 +849,120 @@ object CometScanRule extends Logging { } } + + /** + * Single-pass validation of Iceberg FileScanTasks. + * + * Consolidates file format, filesystem scheme, residual transform, and delete file checks into + * one iteration for better performance with large tables. + */ + def validateIcebergFileScanTasks(tasks: java.util.List[_]): IcebergTaskValidationResult = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) + val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) + val unboundPredicateClass = Class.forName(IcebergReflection.ClassNames.UNBOUND_PREDICATE) + // scalastyle:on classforname + + // Cache all method lookups outside the loop + val fileMethod = contentScanTaskClass.getMethod("file") + val formatMethod = contentFileClass.getMethod("format") + val pathMethod = contentFileClass.getMethod("path") + val residualMethod = contentScanTaskClass.getMethod("residual") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val termMethod = unboundPredicateClass.getMethod("term") + + val supportedSchemes = + Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb") + + var allParquet = true + val unsupportedSchemes = mutable.Set[String]() + var nonIdentityTransform: Option[String] = None + val deleteFiles = new java.util.ArrayList[Any]() + + tasks.asScala.foreach { task => + val dataFile = fileMethod.invoke(task) + + // File format check + val fileFormat = formatMethod.invoke(dataFile).toString + if (fileFormat != IcebergReflection.FileFormats.PARQUET) { + allParquet = false + } + + // Filesystem scheme check for data file + try { + val filePath = pathMethod.invoke(dataFile).toString + val uri = new URI(filePath) + val scheme = uri.getScheme + if (scheme != null && !supportedSchemes.contains(scheme)) { + unsupportedSchemes += scheme + } + } catch { + case _: java.net.URISyntaxException => // ignore + } + + // Residual transform check (short-circuit if already found unsupported) + if (nonIdentityTransform.isEmpty && fileScanTaskClass.isInstance(task)) { + try { + val residual = residualMethod.invoke(task) + if (unboundPredicateClass.isInstance(residual)) { + val term = termMethod.invoke(residual) + try { + val transformMethod = term.getClass.getMethod("transform") + transformMethod.setAccessible(true) + val transform = transformMethod.invoke(term) + val transformStr = transform.toString + if (transformStr != IcebergReflection.Transforms.IDENTITY) { + nonIdentityTransform = Some(transformStr) + } + } catch { + case _: NoSuchMethodException => // No transform = simple reference, OK + } + } + } catch { + case _: Exception => // Skip tasks where we can't get residual + } + } + + // Collect delete files and check their schemes + if (fileScanTaskClass.isInstance(task)) { + try { + val deletes = deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + deleteFiles.addAll(deletes) + + deletes.asScala.foreach { deleteFile => + IcebergReflection.extractFileLocation(contentFileClass, deleteFile).foreach { + deletePath => + try { + val deleteUri = new URI(deletePath) + val deleteScheme = deleteUri.getScheme + if (deleteScheme != null && !supportedSchemes.contains(deleteScheme)) { + unsupportedSchemes += deleteScheme + } + } catch { + case _: java.net.URISyntaxException => // ignore + } + } + } + } catch { + case _: Exception => // ignore errors accessing delete files + } + } + } + + IcebergTaskValidationResult( + allParquet, + unsupportedSchemes.toSet, + nonIdentityTransform, + deleteFiles) + } } + +/** + * Result of single-pass validation over Iceberg FileScanTasks. + */ +case class IcebergTaskValidationResult( + allParquet: Boolean, + unsupportedSchemes: Set[String], + nonIdentityTransform: Option[String], + deleteFiles: java.util.List[_]) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala index 036d526a41..2f3904c5df 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometOperatorSerdeBenchmark.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.comet.CometConf +import org.apache.comet.rules.CometScanRule import org.apache.comet.serde.OperatorOuterClass import org.apache.comet.serde.operator.CometIcebergNativeScan @@ -301,9 +302,72 @@ object CometOperatorSerdeBenchmark extends CometBenchmarkBase { } } + /** + * Benchmarks CometScanRule.apply() on Iceberg BatchScanExec plans. + * + * This measures the validation overhead when converting Spark Iceberg scans to Comet scans. + */ + def icebergScanRuleBenchmark(numPartitions: Int): Unit = { + if (!icebergAvailable) { + // scalastyle:off println + println("Iceberg not available in classpath, skipping benchmark") + // scalastyle:on println + return + } + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.bench_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.bench_cat.type" -> "hadoop", + "spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create the partitioned table + createPartitionedIcebergTable(warehouseDir, numPartitions) + + val fullTableName = "bench_cat.db.serde_bench_table" + + // Get the sparkPlan (before post-hoc rules like CometScanRule) + val df = spark.sql(s"SELECT * FROM $fullTableName") + val sparkPlan = df.queryExecution.sparkPlan + + // scalastyle:off println + println(s"SparkPlan class: ${sparkPlan.getClass.getSimpleName}") + // scalastyle:on println + + val rule = CometScanRule(spark) + val iterations = 100 + + val benchmark = new Benchmark( + s"CometScanRule apply ($numPartitions partitions)", + iterations, + output = output) + + benchmark.addCase("CometScanRule.apply(sparkPlan)") { _ => + var i = 0 + while (i < iterations) { + rule.apply(sparkPlan) + i += 1 + } + } + + benchmark.run() + + // Cleanup + spark.sql(s"DROP TABLE IF EXISTS $fullTableName") + } + } + } + override def runCometBenchmark(args: Array[String]): Unit = { val numPartitions = if (args.nonEmpty) args(0).toInt else 30000 + runBenchmark("CometScanRule Benchmark") { + icebergScanRuleBenchmark(numPartitions) + } + runBenchmark("IcebergScan Operator Serde Benchmark") { icebergScanSerdeBenchmark(numPartitions) }