Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ee1ccf8
Continue #3295, experimental DPP support.
mbutrovich Jan 30, 2026
70ad4f5
Remove unnecessary steps in convert(), hoist reflection calls out of …
mbutrovich Jan 30, 2026
e820616
scalastyle
mbutrovich Jan 30, 2026
b08abe0
Fix scenario with multiple DPP expressions (i.e., join on two partiti…
mbutrovich Jan 30, 2026
9dc84f9
Docs.
mbutrovich Jan 30, 2026
b3c7c79
Throw an exception on reflection error in setInSubqueryResult, strong…
mbutrovich Jan 30, 2026
a90f06d
Comments cleanup. Throw exception if column not found in subquery.
mbutrovich Jan 30, 2026
09066fb
Avoid capturing perPartitionByLocation in closure when:
mbutrovich Jan 30, 2026
4ea7c78
Remove IcebergFilePartition from proto and clean up native code now t…
mbutrovich Jan 30, 2026
9eb95d7
Use sab.index and sab.buildKeys with exprId matching (handles renamed…
mbutrovich Jan 31, 2026
0fd297e
Simplify matching logic for SubqueryAdaptiveBroadcastExec expressions…
mbutrovich Jan 31, 2026
8f7b29d
add shim for Spark 4.0 SAB API change (indices instead of index), add…
mbutrovich Jan 31, 2026
da27f5f
add Spark 3.4 shim, whoops
mbutrovich Jan 31, 2026
4af89b2
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Jan 31, 2026
97f3693
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 1, 2026
43a56e5
Comment.
mbutrovich Feb 1, 2026
bac6d66
Refactor down to just one CometExecRDD. Let's see how CI goes.
mbutrovich Feb 1, 2026
e73cca0
Fix spotless.
mbutrovich Feb 1, 2026
95c4e6d
Fix broadcast with DPP?
mbutrovich Feb 1, 2026
67c8bdb
Minor refactor for variable names, comments.
mbutrovich Feb 1, 2026
aa048a7
Fix scalastyle.
mbutrovich Feb 1, 2026
02a52a3
cache parsed commonData.
mbutrovich Feb 1, 2026
9cc541a
Address PR feedback.
mbutrovich Feb 2, 2026
bd70924
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 4, 2026
b10ed64
add test
mbutrovich Feb 5, 2026
b54c87a
Address PR feedback.
mbutrovich Feb 5, 2026
51f8c42
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 5, 2026
268cc00
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 6, 2026
6d62175
upmerge main, resolve conflicts, format
mbutrovich Feb 6, 2026
82e4513
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 6, 2026
f0720f5
Add docs to try to address PR feedback.
mbutrovich Feb 6, 2026
212ebef
Minor refactor for readability. spotless:apply
mbutrovich Feb 6, 2026
4e13899
Add LRU cache to IcebergPlanDataInjector.
mbutrovich Feb 7, 2026
fb10fb5
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 7, 2026
96af2a2
Clean up imports.
mbutrovich Feb 7, 2026
6bec0b6
Single pass FileScanTask validation.
mbutrovich Feb 6, 2026
ac5e2ed
Add benchmark.
mbutrovich Feb 7, 2026
c20c3e5
Merge branch 'main' into faster_filescantask
mbutrovich Feb 8, 2026
462552d
Fix exception behavior.
mbutrovich Feb 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 2 additions & 177 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,32 +333,6 @@ object IcebergReflection extends Logging {
}
}

/**
* Gets delete files from scan tasks.
*
* @param tasks
* List of Iceberg FileScanTask objects
* @return
* List of all delete files across all tasks
* @throws Exception
* if reflection fails (callers must handle appropriately based on context)
*/
def getDeleteFiles(tasks: java.util.List[_]): java.util.List[_] = {
import scala.jdk.CollectionConverters._
val allDeletes = new java.util.ArrayList[Any]()

// scalastyle:off classforname
val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK)
// scalastyle:on classforname

tasks.asScala.foreach { task =>
val deletes = getDeleteFilesFromTask(task, fileScanTaskClass)
allDeletes.addAll(deletes)
}

allDeletes
}

/**
* Gets delete files from a single FileScanTask.
*
Expand Down Expand Up @@ -495,91 +469,6 @@ object IcebergReflection extends Logging {
}
}

/**
* Validates file formats and filesystem schemes for Iceberg tasks.
*
* Checks that all data files and delete files are Parquet format and use filesystem schemes
* supported by iceberg-rust (file, s3, s3a, gs, gcs, oss, abfss, abfs, wasbs, wasb).
*
* @param tasks
* List of Iceberg FileScanTask objects
* @return
* (allParquet, unsupportedSchemes) where: - allParquet: true if all files are Parquet format
* \- unsupportedSchemes: Set of unsupported filesystem schemes found (empty if all supported)
*/
def validateFileFormatsAndSchemes(tasks: java.util.List[_]): (Boolean, Set[String]) = {
import scala.jdk.CollectionConverters._

// scalastyle:off classforname
val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK)
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
// scalastyle:on classforname

val fileMethod = contentScanTaskClass.getMethod("file")
val formatMethod = contentFileClass.getMethod("format")
val pathMethod = contentFileClass.getMethod("path")

// Filesystem schemes supported by iceberg-rust
// See: iceberg-rust/crates/iceberg/src/io/storage.rs parse_scheme()
val supportedSchemes =
Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb")

var allParquet = true
val unsupportedSchemes = scala.collection.mutable.Set[String]()

tasks.asScala.foreach { task =>
val dataFile = fileMethod.invoke(task)
val fileFormat = formatMethod.invoke(dataFile).toString

// Check file format
if (fileFormat != FileFormats.PARQUET) {
allParquet = false
} else {
// Only check filesystem schemes for Parquet files we'll actually process
try {
val filePath = pathMethod.invoke(dataFile).toString
val uri = new java.net.URI(filePath)
val scheme = uri.getScheme

if (scheme != null && !supportedSchemes.contains(scheme)) {
unsupportedSchemes += scheme
}
} catch {
case _: java.net.URISyntaxException =>
// Ignore URI parsing errors - file paths may contain special characters
// If the path is invalid, we'll fail later during actual file access
}

// Check delete files if they exist
try {
val deletesMethod = task.getClass.getMethod("deletes")
val deleteFiles = deletesMethod.invoke(task).asInstanceOf[java.util.List[_]]

deleteFiles.asScala.foreach { deleteFile =>
extractFileLocation(contentFileClass, deleteFile).foreach { deletePath =>
try {
val deleteUri = new java.net.URI(deletePath)
val deleteScheme = deleteUri.getScheme

if (deleteScheme != null && !supportedSchemes.contains(deleteScheme)) {
unsupportedSchemes += deleteScheme
}
} catch {
case _: java.net.URISyntaxException =>
// Ignore URI parsing errors for delete files too
}
}
}
} catch {
case _: Exception =>
// Ignore errors accessing delete files - they may not be supported
}
}
}

(allParquet, unsupportedSchemes.toSet)
}

/**
* Validates partition column types for compatibility with iceberg-rust.
*
Expand Down Expand Up @@ -643,68 +532,6 @@ object IcebergReflection extends Logging {

unsupportedTypes.toList
}

/**
* Checks if tasks have non-identity transforms in their residual expressions.
*
* Residual expressions are filters that must be evaluated after reading data from Parquet.
* iceberg-rust can only handle simple column references in residuals, not transformed columns.
* Transform functions like truncate, bucket, year, month, day, hour require evaluation by
* Spark.
*
* @param tasks
* List of Iceberg FileScanTask objects
* @return
* Some(transformType) if an unsupported transform is found (e.g., "truncate[4]"), None if all
* transforms are identity or no transforms are present
* @throws Exception
* if reflection fails - caller must handle appropriately (fallback in planning, fatal in
* serialization)
*/
def findNonIdentityTransformInResiduals(tasks: java.util.List[_]): Option[String] = {
import scala.jdk.CollectionConverters._

// scalastyle:off classforname
val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK)
val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK)
val unboundPredicateClass = Class.forName(ClassNames.UNBOUND_PREDICATE)
// scalastyle:on classforname

tasks.asScala.foreach { task =>
if (fileScanTaskClass.isInstance(task)) {
try {
val residualMethod = contentScanTaskClass.getMethod("residual")
val residual = residualMethod.invoke(task)

// Check if residual is an UnboundPredicate with a transform
if (unboundPredicateClass.isInstance(residual)) {
val termMethod = unboundPredicateClass.getMethod("term")
val term = termMethod.invoke(residual)

// Check if term has a transform
try {
val transformMethod = term.getClass.getMethod("transform")
transformMethod.setAccessible(true)
val transform = transformMethod.invoke(term)
val transformStr = transform.toString

// Only identity transform is supported in residuals
if (transformStr != Transforms.IDENTITY) {
return Some(transformStr)
}
} catch {
case _: NoSuchMethodException =>
// No transform method means it's a simple reference - OK
}
}
} catch {
case _: Exception =>
// Skip tasks where we can't get residual - they may not have one
}
}
}
None
}
}

/**
Expand Down Expand Up @@ -783,10 +610,8 @@ object CometIcebergNativeScanMetadata extends Logging {
val globalFieldIdMapping = buildFieldIdMapping(scanSchema)

// File format is always PARQUET,
// validated in CometScanRule.validateFileFormatsAndSchemes()
// validated in CometScanRule.validateIcebergFileScanTasks()
// Hardcoded here for extensibility (future ORC/Avro support would add logic here)
val fileFormat = FileFormats.PARQUET

CometIcebergNativeScanMetadata(
table = table,
metadataLocation = metadataLocation,
Expand All @@ -796,7 +621,7 @@ object CometIcebergNativeScanMetadata extends Logging {
tableSchema = tableSchema,
globalFieldIdMapping = globalFieldIdMapping,
catalogProperties = catalogProperties,
fileFormat = fileFormat)
fileFormat = FileFormats.PARQUET)
}
}
}
Loading
Loading