Skip to content
15 changes: 13 additions & 2 deletions docs/source/user-guide/latest/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ The native Iceberg reader supports the following features:

**Table specifications:**

- Iceberg table spec v1 and v2 (v3 will fall back to Spark)
- Iceberg table spec v1, v2, and v3 (basic read support)
- V3 tables using V2-compatible features are fully accelerated
- V3-specific features (Deletion Vectors, new types) will gracefully fall back to Spark

**Schema and data types:**

Expand Down Expand Up @@ -266,10 +268,19 @@ scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()

The following scenarios will fall back to Spark's native Iceberg reader:

- Iceberg table spec v3 scans
- Iceberg writes (reads are accelerated, writes use Spark)
- Tables backed by Avro or ORC data files (only Parquet is accelerated)
- Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns
- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour`
transform functions (partition pruning still works, but row-level filtering of these
transforms falls back)

**V3-specific limitations (graceful fallback to Spark):**

- Deletion Vectors (DVs) - V3's efficient bitmap-based deletes stored in Puffin files
- V3-only data types: `timestamp_ns`, `timestamptz_ns`, `variant`, `geometry`, `geography`
- Table encryption - tables with `encryption.key-id` or other encryption properties
- Default column values - schema fields with `initial-default` or `write-default`

Note: V3 tables that use only V2-compatible features (position deletes, equality deletes,
standard types, no encryption, no column defaults) are fully accelerated by Comet.
243 changes: 243 additions & 0 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,249 @@ object IcebergReflection extends Logging {

unsupportedTypes.toList
}

// ============================================================================
// Iceberg V3 Feature Detection
// ============================================================================

/** V3-only types not yet supported by Comet. */
object V3Types {
val TIMESTAMP_NS = "timestamp_ns"
val TIMESTAMPTZ_NS = "timestamptz_ns"
val VARIANT = "variant"
val GEOMETRY = "geometry"
val GEOGRAPHY = "geography"

val UNSUPPORTED_V3_TYPES: Set[String] =
Set(TIMESTAMP_NS, TIMESTAMPTZ_NS, VARIANT, GEOMETRY, GEOGRAPHY)
}

/** V3 Deletion Vector content type. */
object V3ContentTypes {
val DELETION_VECTOR = "DELETION_VECTOR"
}

/** Checks if any delete files use Deletion Vectors (V3 feature). */
def hasDeletionVectors(deleteFiles: java.util.List[_]): Boolean = {
import scala.jdk.CollectionConverters._

try {
// scalastyle:off classforname
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
// scalastyle:on classforname
val contentMethod = contentFileClass.getMethod("content")

deleteFiles.asScala.exists { deleteFile =>
try {
contentMethod.invoke(deleteFile).toString == V3ContentTypes.DELETION_VECTOR
} catch {
case _: Exception => false
}
}
} catch {
case _: Exception => false
}
}

/** Finds V3-only types in a schema not yet supported by Comet. */
def findUnsupportedV3Types(schema: Any): Set[String] = {
import scala.jdk.CollectionConverters._

val unsupportedTypes = scala.collection.mutable.Set[String]()

try {
val columnsMethod = schema.getClass.getMethod("columns")
val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]]

columns.asScala.foreach { column =>
try {
val typeMethod = column.getClass.getMethod("type")
val icebergType = typeMethod.invoke(column)
val typeStr = icebergType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(icebergType, unsupportedTypes)
} catch {
case _: Exception => // Skip columns where we can't determine type
}
}
} catch {
case e: Exception =>
logWarning(s"Failed to scan schema for V3 types: ${e.getMessage}")
}

unsupportedTypes.toSet
}

/** Recursively checks nested types for V3-only types. */
private def checkNestedTypesForV3(
icebergType: Any,
unsupportedTypes: scala.collection.mutable.Set[String]): Unit = {
import scala.jdk.CollectionConverters._

try {
val typeClass = icebergType.getClass

if (typeClass.getSimpleName.contains("StructType")) {
try {
val fieldsMethod = typeClass.getMethod("fields")
val fields = fieldsMethod.invoke(icebergType).asInstanceOf[java.util.List[_]]

fields.asScala.foreach { field =>
try {
val fieldTypeMethod = field.getClass.getMethod("type")
val fieldType = fieldTypeMethod.invoke(field)
val typeStr = fieldType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(fieldType, unsupportedTypes)
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("ListType")) {
try {
val elementTypeMethod = typeClass.getMethod("elementType")
val elementType = elementTypeMethod.invoke(icebergType)
val typeStr = elementType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(elementType, unsupportedTypes)
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("MapType")) {
try {
val keyTypeMethod = typeClass.getMethod("keyType")
val valueTypeMethod = typeClass.getMethod("valueType")
val keyType = keyTypeMethod.invoke(icebergType)
val valueType = valueTypeMethod.invoke(icebergType)

Seq(keyType, valueType).foreach { mapType =>
val typeStr = mapType.toString.toLowerCase(java.util.Locale.ROOT)
V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}
checkNestedTypesForV3(mapType, unsupportedTypes)
}
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}

/** Checks if table has encryption configured (V3 feature). */
def hasEncryption(table: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
getTableProperties(table).exists { props =>
props.asScala.keys.exists { key =>
key.startsWith("encryption.") || key.startsWith("kms.")
}
}
} catch {
case _: Exception => false
}
}

/** Checks if schema has default column values (V3 feature). */
def hasDefaultColumnValues(schema: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
val columnsMethod = schema.getClass.getMethod("columns")
val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]]

columns.asScala.exists { column =>
try {
hasFieldDefault(column) || hasNestedDefaults(column)
} catch {
case _: Exception => false
}
}
} catch {
case _: Exception => false
}
}

/** Checks if a field has initial-default or write-default set. */
private def hasFieldDefault(field: Any): Boolean = {
try {
val fieldClass = field.getClass

val hasInitialDefault =
try {
val initialDefaultMethod = fieldClass.getMethod("initialDefault")
initialDefaultMethod.invoke(field) != null
} catch {
case _: NoSuchMethodException => false
}

val hasWriteDefault =
try {
val writeDefaultMethod = fieldClass.getMethod("writeDefault")
writeDefaultMethod.invoke(field) != null
} catch {
case _: NoSuchMethodException => false
}

hasInitialDefault || hasWriteDefault
} catch {
case _: Exception => false
}
}

/** Recursively checks nested struct fields for defaults. */
private def hasNestedDefaults(field: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
val typeMethod = field.getClass.getMethod("type")
val fieldType = typeMethod.invoke(field)
val typeClass = fieldType.getClass

if (typeClass.getSimpleName.contains("StructType")) {
val fieldsMethod = typeClass.getMethod("fields")
val nestedFields = fieldsMethod.invoke(fieldType).asInstanceOf[java.util.List[_]]

nestedFields.asScala.exists { nestedField =>
hasFieldDefault(nestedField) || hasNestedDefaults(nestedField)
}
} else {
false
}
} catch {
case _: Exception => false
}
}
}

/**
Expand Down
73 changes: 63 additions & 10 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ case class CometScanRule(session: SparkSession)

val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) =>
if (formatVersion > 2) {
if (formatVersion > 3) {
fallbackReasons += "Iceberg table format version " +
s"$formatVersion is not supported. " +
"Comet only supports Iceberg table format V1 and V2"
"Comet supports Iceberg table format V1, V2, and V3"
false
} else {
true
Expand Down Expand Up @@ -612,18 +612,72 @@ case class CometScanRule(session: SparkSession)
!hasUnsupportedDeletes
}

val v3FeaturesSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) if formatVersion >= 3 =>
var allV3FeaturesSupported = true

try {
if (IcebergReflection.hasDeletionVectors(taskValidation.deleteFiles)) {
fallbackReasons += "Iceberg V3 Deletion Vectors are not yet supported. " +
"Tables using Deletion Vectors will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"Deletion Vectors: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
val unsupportedTypes = IcebergReflection.findUnsupportedV3Types(metadata.scanSchema)
if (unsupportedTypes.nonEmpty) {
fallbackReasons += "Iceberg V3 types not yet supported: " +
s"${unsupportedTypes.mkString(", ")}. " +
"Tables with these types will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"V3 types: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these check may slowdown a bit but eventually the plan is to support these v3 features and fully functional iceberg v3 spec support.

if (IcebergReflection.hasEncryption(metadata.table)) {
fallbackReasons += "Iceberg table encryption is not yet supported"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"encryption: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
if (IcebergReflection.hasDefaultColumnValues(metadata.scanSchema)) {
fallbackReasons += "Iceberg default column values are not yet supported"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"default column values: ${e.getMessage}"
allV3FeaturesSupported = false
}

allV3FeaturesSupported
case _ => true
}

// Check that all DPP subqueries use InSubqueryExec which we know how to handle.
// Future Spark versions might introduce new subquery types we haven't tested.
val dppSubqueriesSupported = {
val unsupportedSubqueries = scanExec.runtimeFilters.collect {
case DynamicPruningExpression(e) if !e.isInstanceOf[InSubqueryExec] =>
e.getClass.getSimpleName
}
// Check for multi-index DPP which we don't support yet.
// SPARK-46946 changed SubqueryAdaptiveBroadcastExec from index: Int to indices: Seq[Int]
// as a preparatory refactor for future features (Null Safe Equality DPP, multiple
// equality predicates). Currently indices always has one element, but future Spark
// versions might use multiple indices.
val multiIndexDpp = scanExec.runtimeFilters.exists {
case DynamicPruningExpression(e: InSubqueryExec) =>
e.plan match {
Expand All @@ -639,7 +693,6 @@ case class CometScanRule(session: SparkSession)
"CometIcebergNativeScanExec only supports InSubqueryExec for DPP"
false
} else if (multiIndexDpp) {
// See SPARK-46946 for context on multi-index DPP
fallbackReasons +=
"Multi-index DPP (indices.length > 1) is not yet supported. " +
"See SPARK-46946 for context."
Expand All @@ -652,7 +705,7 @@ case class CometScanRule(session: SparkSession)
if (schemaSupported && fileIOCompatible && formatVersionSupported &&
taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported &&
complexTypePredicatesSupported && transformFunctionsSupported &&
deleteFileTypesSupported && dppSubqueriesSupported) {
deleteFileTypesSupported && v3FeaturesSupported && dppSubqueriesSupported) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters,
Expand Down
Loading