diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/DeltaParquetFileFormat.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/DeltaParquetFileFormat.java
new file mode 100644
index 00000000000..1dab7d14c71
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/DeltaParquetFileFormat.java
@@ -0,0 +1,352 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.read;
+
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray;
+import io.delta.kernel.internal.util.ColumnMapping;
+import io.delta.kernel.internal.util.Tuple2;
+import io.delta.kernel.spark.utils.SchemaUtils;
+import io.delta.kernel.spark.utils.SerializableKernelRowWrapper;
+import io.delta.kernel.types.StructType;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.PartitionedFile;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarBatchRow;
+import scala.Function1;
+import scala.collection.Iterator;
+
+/**
+ * A Delta-aware Parquet file format that supports: - Column Mapping (ID and Name modes) - Deletion
+ * Vectors
+ *
+ *
This format wraps Spark's ParquetFileFormat and adds Delta-specific transformations using
+ * Kernel APIs.
+ */
+public class DeltaParquetFileFormat extends ParquetFileFormat {
+
+ // Metadata key for deletion vector descriptor
+ public static final String DV_DESCRIPTOR_KEY = "__delta_dv_descriptor";
+
+ // Serializable wrappers for Protocol and Metadata
+ private final SerializableKernelRowWrapper protocolWrapper;
+ private final SerializableKernelRowWrapper metadataWrapper;
+ private final String tablePath;
+ private final ColumnMapping.ColumnMappingMode columnMappingMode;
+
+ public DeltaParquetFileFormat(
+ Engine kernelEngine, Protocol protocol, Metadata metadata, String tablePath) {
+ Objects.requireNonNull(kernelEngine, "kernelEngine is null");
+ Objects.requireNonNull(protocol, "protocol is null");
+ Objects.requireNonNull(metadata, "metadata is null");
+ this.tablePath = Objects.requireNonNull(tablePath, "tablePath is null");
+
+ // Wrap Protocol and Metadata in serializable wrappers
+ this.protocolWrapper = new SerializableKernelRowWrapper(protocol.toRow());
+ this.metadataWrapper = new SerializableKernelRowWrapper(metadata.toRow());
+ this.columnMappingMode = ColumnMapping.getColumnMappingMode(metadata.getConfiguration());
+ }
+
+ /** Get Metadata from wrapper */
+ private Metadata getMetadata() {
+ return Metadata.fromRow(metadataWrapper.getRow());
+ }
+
+ public Function1> buildReaderWithPartitionValues(
+ SparkSession sparkSession,
+ org.apache.spark.sql.types.StructType dataSchema,
+ org.apache.spark.sql.types.StructType partitionSchema,
+ org.apache.spark.sql.types.StructType requiredSchema,
+ scala.collection.Seq filters,
+ scala.collection.immutable.Map options,
+ Configuration hadoopConf) {
+
+ // Step 1: Convert logical schema to physical schema using Kernel API
+ org.apache.spark.sql.types.StructType physicalDataSchema = convertToPhysicalSchema(dataSchema);
+ org.apache.spark.sql.types.StructType physicalPartitionSchema =
+ convertToPhysicalSchema(partitionSchema);
+ org.apache.spark.sql.types.StructType physicalRequiredSchema =
+ convertToPhysicalSchema(requiredSchema);
+
+ // Step 2: Translate filters to use physical column names
+ scala.collection.immutable.Seq physicalFilters =
+ columnMappingMode == ColumnMapping.ColumnMappingMode.NONE
+ ? convertFiltersToImmutable(filters)
+ : io.delta.kernel.spark.utils.ExpressionUtils.convertFiltersToPhysicalNames(
+ filters, dataSchema, physicalDataSchema);
+
+ // Step 3: Build standard Parquet reader with physical schema
+ Function1> baseReader =
+ super.buildReaderWithPartitionValues(
+ sparkSession,
+ physicalDataSchema,
+ physicalPartitionSchema,
+ physicalRequiredSchema,
+ physicalFilters,
+ options,
+ hadoopConf);
+
+ // Step 4: Wrap reader to apply deletion vector filtering
+ return (PartitionedFile file) -> {
+ Iterator baseIterator = baseReader.apply(file);
+ return applyDeletionVectorIfNeeded(file, baseIterator, hadoopConf);
+ };
+ }
+
+ /** Convert logical Spark schema to physical schema using Kernel's ColumnMapping utilities. */
+ private org.apache.spark.sql.types.StructType convertToPhysicalSchema(
+ org.apache.spark.sql.types.StructType logicalSchema) {
+ if (columnMappingMode == ColumnMapping.ColumnMappingMode.NONE) {
+ return logicalSchema;
+ }
+
+ Metadata metadata = getMetadata();
+
+ // Convert Spark StructType to Kernel StructType
+ StructType kernelLogicalSchema = SchemaUtils.convertSparkSchemaToKernelSchema(logicalSchema);
+ StructType kernelFullSchema = metadata.getSchema();
+
+ // Use Kernel API to convert to physical schema
+ StructType kernelPhysicalSchema =
+ ColumnMapping.convertToPhysicalSchema(
+ kernelLogicalSchema, kernelFullSchema, columnMappingMode);
+
+ // Convert back to Spark StructType
+ return SchemaUtils.convertKernelSchemaToSparkSchema(kernelPhysicalSchema);
+ }
+
+ /** Convert Seq to immutable Seq for compatibility. */
+ private scala.collection.immutable.Seq convertFiltersToImmutable(
+ scala.collection.Seq filters) {
+ if (filters instanceof scala.collection.immutable.Seq) {
+ return (scala.collection.immutable.Seq) filters;
+ }
+ return scala.collection.JavaConverters.asScalaBuffer(
+ scala.collection.JavaConverters.seqAsJavaList(filters))
+ .toSeq();
+ }
+
+ /**
+ * Apply deletion vector filtering if present. Supports both vectorized (ColumnarBatch) and
+ * non-vectorized (InternalRow) data from Parquet reader.
+ */
+ @SuppressWarnings("unchecked")
+ private Iterator applyDeletionVectorIfNeeded(
+ PartitionedFile file, Iterator dataIterator, Configuration hadoopConf) {
+
+ Optional dvDescriptorOpt = extractDeletionVectorDescriptor(file);
+
+ if (!dvDescriptorOpt.isPresent()) {
+ return dataIterator;
+ }
+
+ // Load deletion vector using Kernel API
+ RoaringBitmapArray deletionVector = loadDeletionVector(dvDescriptorOpt.get(), hadoopConf);
+
+ // Filter out deleted rows - handle both vectorized and row-based data
+ // Cast to Iterator since Parquet may return ColumnarBatch or InternalRow
+ Iterator objectIterator = (Iterator) (Iterator>) dataIterator;
+ return new DeletionVectorFilterIterator(objectIterator, deletionVector);
+ }
+
+ /** Extract deletion vector descriptor from PartitionedFile metadata. */
+ private Optional extractDeletionVectorDescriptor(PartitionedFile file) {
+ scala.collection.immutable.Map metadata =
+ file.otherConstantMetadataColumnValues();
+
+ scala.Option dvOption = metadata.get(DV_DESCRIPTOR_KEY);
+ if (dvOption.isDefined()) {
+ Object dvObj = dvOption.get();
+ if (dvObj instanceof DeletionVectorDescriptor) {
+ return Optional.of((DeletionVectorDescriptor) dvObj);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /** Load deletion vector bitmap using Kernel API. */
+ private RoaringBitmapArray loadDeletionVector(
+ DeletionVectorDescriptor dvDescriptor, Configuration hadoopConf) {
+ try {
+ // Create a new engine for this task
+ Engine engine = io.delta.kernel.defaults.engine.DefaultEngine.create(hadoopConf);
+ Tuple2 result =
+ io.delta.kernel.internal.deletionvectors.DeletionVectorUtils.loadNewDvAndBitmap(
+ engine, tablePath, dvDescriptor);
+ return result._2;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to load deletion vector", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (!(other instanceof DeltaParquetFileFormat)) return false;
+
+ DeltaParquetFileFormat that = (DeltaParquetFileFormat) other;
+ return Objects.equals(this.tablePath, that.tablePath)
+ && Objects.equals(this.columnMappingMode, that.columnMappingMode);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tablePath, columnMappingMode);
+ }
+
+ /**
+ * Iterator that filters out rows marked as deleted in the deletion vector. Supports both
+ * vectorized (ColumnarBatch) and non-vectorized (InternalRow) data.
+ */
+ private static class DeletionVectorFilterIterator
+ extends scala.collection.AbstractIterator {
+ private final Iterator underlying;
+ private final RoaringBitmapArray deletionVector;
+ private long currentRowIndex = 0;
+
+ // For handling ColumnarBatch - use Scala Iterator
+ private scala.collection.Iterator currentBatchIterator = null;
+
+ // Type handlers map for processing different data formats
+ private final Map, Function> typeHandlers;
+
+ DeletionVectorFilterIterator(Iterator underlying, RoaringBitmapArray deletionVector) {
+ this.underlying = underlying;
+ this.deletionVector = deletionVector;
+
+ // Initialize type handlers
+ this.typeHandlers = new HashMap<>();
+ typeHandlers.put(ColumnarBatch.class, this::handleColumnarBatch);
+ typeHandlers.put(ColumnarBatchRow.class, this::handleColumnarBatchRow);
+ typeHandlers.put(InternalRow.class, this::handleInternalRow);
+ }
+
+ @Override
+ public boolean hasNext() {
+ // First check if we have rows from current batch
+ if (currentBatchIterator != null && currentBatchIterator.hasNext()) {
+ return true;
+ }
+
+ // Try to get next batch or row
+ return underlying.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ // If we have rows from current batch, return next one
+ if (currentBatchIterator != null && currentBatchIterator.hasNext()) {
+ return currentBatchIterator.next();
+ }
+
+ // Get next item from underlying iterator
+ Object next = underlying.next();
+
+ // Use type handlers map to process different data formats
+ Function handler = typeHandlers.get(next.getClass());
+ if (handler != null) {
+ return handler.apply(next);
+ } else {
+ throw new RuntimeException(
+ "Unexpected row type from Parquet reader: " + next.getClass().getName());
+ }
+ }
+
+ /** Handle vectorized ColumnarBatch data */
+ private InternalRow handleColumnarBatch(Object obj) {
+ ColumnarBatch batch = (ColumnarBatch) obj;
+ List filteredRows = filterColumnarBatch(batch);
+ // Convert Java Iterator to Scala Iterator
+ currentBatchIterator =
+ scala.collection.JavaConverters.asScalaIterator(filteredRows.iterator());
+ return currentBatchIterator.next();
+ }
+
+ /**
+ * Handle ColumnarBatchRow - vectorized reader enabled but returns immutable rows. This is not
+ * efficient and should only affect wide tables.
+ */
+ private InternalRow handleColumnarBatchRow(Object obj) {
+ ColumnarBatchRow columnarRow = (ColumnarBatchRow) obj;
+ // Need to copy the row since ColumnarBatchRow is immutable
+ InternalRow row = columnarRow.copy();
+ // Filter out deleted rows
+ while (deletionVector.contains(currentRowIndex)) {
+ currentRowIndex++;
+ if (!underlying.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Object next = underlying.next();
+ if (next instanceof ColumnarBatchRow) {
+ row = ((ColumnarBatchRow) next).copy();
+ } else {
+ row = (InternalRow) next;
+ }
+ }
+ currentRowIndex++;
+ return row;
+ }
+
+ /** Handle non-vectorized InternalRow data */
+ private InternalRow handleInternalRow(Object obj) {
+ InternalRow row = (InternalRow) obj;
+ // Filter out deleted rows
+ while (deletionVector.contains(currentRowIndex)) {
+ currentRowIndex++;
+ if (!underlying.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ row = (InternalRow) underlying.next();
+ }
+ currentRowIndex++;
+ return row;
+ }
+
+ /** Filter ColumnarBatch by deletion vector. Returns list of non-deleted rows. */
+ private List filterColumnarBatch(ColumnarBatch batch) {
+ List result = new ArrayList<>();
+ int numRows = batch.numRows();
+
+ for (int i = 0; i < numRows; i++) {
+ if (!deletionVector.contains(currentRowIndex + i)) {
+ result.add(batch.getRow(i).copy());
+ }
+ }
+
+ currentRowIndex += numRows;
+ return result;
+ }
+ }
+}
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java
index db0961d08e0..0a32598cf86 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java
@@ -15,7 +15,11 @@
*/
package io.delta.kernel.spark.read;
+import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -30,11 +34,9 @@
import org.apache.spark.sql.execution.datasources.FilePartition;
import org.apache.spark.sql.execution.datasources.FilePartition$;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.StructType;
import scala.Function1;
import scala.Option;
import scala.Tuple2;
@@ -43,9 +45,9 @@
public class SparkBatch implements Batch {
private final String tablePath;
- private final StructType readDataSchema;
- private final StructType dataSchema;
- private final StructType partitionSchema;
+ private final org.apache.spark.sql.types.StructType readDataSchema;
+ private final org.apache.spark.sql.types.StructType dataSchema;
+ private final org.apache.spark.sql.types.StructType partitionSchema;
private final Predicate[] pushedToKernelFilters;
private final Filter[] dataFilters;
private final Configuration hadoopConf;
@@ -53,19 +55,24 @@ public class SparkBatch implements Batch {
private final long totalBytes;
private scala.collection.immutable.Map scalaOptions;
private final List partitionedFiles;
+ private final SnapshotImpl snapshot;
+ private final Engine kernelEngine;
public SparkBatch(
+ Engine kernelEngine,
String tablePath,
- StructType dataSchema,
- StructType partitionSchema,
- StructType readDataSchema,
+ org.apache.spark.sql.types.StructType dataSchema,
+ org.apache.spark.sql.types.StructType partitionSchema,
+ org.apache.spark.sql.types.StructType readDataSchema,
List partitionedFiles,
Predicate[] pushedToKernelFilters,
Filter[] dataFilters,
long totalBytes,
scala.collection.immutable.Map scalaOptions,
- Configuration hadoopConf) {
+ Configuration hadoopConf,
+ SnapshotImpl snapshot) {
+ this.kernelEngine = Objects.requireNonNull(kernelEngine, "kernelEngine is null");
this.tablePath = Objects.requireNonNull(tablePath, "tableName is null");
this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null");
@@ -82,6 +89,7 @@ public SparkBatch(
this.totalBytes = totalBytes;
this.scalaOptions = Objects.requireNonNull(scalaOptions, "scalaOptions is null");
this.hadoopConf = Objects.requireNonNull(hadoopConf, "hadoopConf is null");
+ this.snapshot = Objects.requireNonNull(snapshot, "snapshot is null");
this.sqlConf = SQLConf.get();
}
@@ -105,8 +113,12 @@ public PartitionReaderFactory createReaderFactory() {
new Tuple2<>(
FileFormat$.MODULE$.OPTION_RETURNING_BATCH(),
String.valueOf(enableVectorizedReader)));
+
+ Protocol protocol = snapshot.getProtocol();
+ Metadata metadata = snapshot.getMetadata();
+
Function1> readFunc =
- new ParquetFileFormat()
+ new DeltaParquetFileFormat(kernelEngine, protocol, metadata, tablePath)
.buildReaderWithPartitionValues(
SparkSession.active(),
dataSchema,
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java
index 656d364e70d..acad3401dcb 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java
@@ -22,7 +22,10 @@
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.spark.utils.ScalaUtils;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
@@ -62,6 +65,8 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim
private final scala.collection.immutable.Map scalaOptions;
private final SQLConf sqlConf;
private final ZoneId zoneId;
+ private final SnapshotImpl snapshot;
+ private final Engine kernelEngine;
// Planned input files and stats
private List partitionedFiles = new ArrayList<>();
@@ -76,7 +81,8 @@ public SparkScan(
Predicate[] pushedToKernelFilters,
Filter[] dataFilters,
io.delta.kernel.Scan kernelScan,
- CaseInsensitiveStringMap options) {
+ CaseInsensitiveStringMap options,
+ SnapshotImpl snapshot) {
final String normalizedTablePath = Objects.requireNonNull(tablePath, "tablePath is null");
this.tablePath =
@@ -89,10 +95,12 @@ public SparkScan(
this.dataFilters = dataFilters == null ? new Filter[0] : dataFilters.clone();
this.kernelScan = Objects.requireNonNull(kernelScan, "kernelScan is null");
this.options = Objects.requireNonNull(options, "options is null");
+ this.snapshot = Objects.requireNonNull(snapshot, "snapshot is null");
this.scalaOptions = ScalaUtils.toScalaMap(options);
this.hadoopConf = SparkSession.active().sessionState().newHadoopConfWithOptions(scalaOptions);
this.sqlConf = SQLConf.get();
this.zoneId = ZoneId.of(sqlConf.sessionLocalTimeZone());
+ this.kernelEngine = DefaultEngine.create(hadoopConf);
}
/**
@@ -112,6 +120,7 @@ public StructType readSchema() {
public Batch toBatch() {
ensurePlanned();
return new SparkBatch(
+ kernelEngine,
tablePath,
dataSchema,
partitionSchema,
@@ -121,7 +130,8 @@ public Batch toBatch() {
dataFilters,
totalBytes,
scalaOptions,
- hadoopConf);
+ hadoopConf,
+ snapshot);
}
@Override
@@ -206,8 +216,6 @@ private void planScanFiles() {
kernelScan.getScanFiles(tableEngine);
final String[] locations = new String[0];
- final scala.collection.immutable.Map otherConstantMetadataColumnValues =
- scala.collection.immutable.Map$.MODULE$.empty();
while (scanFileBatches.hasNext()) {
final io.delta.kernel.data.FilteredColumnarBatch batch = scanFileBatches.next();
@@ -217,6 +225,23 @@ private void planScanFiles() {
final Row row = addFileRowIter.next();
final AddFile addFile = new AddFile(row.getStruct(0));
+ // Extract deletion vector descriptor if present
+ scala.collection.immutable.Map metadata;
+ DeletionVectorDescriptor dvDescriptor =
+ InternalScanFileUtils.getDeletionVectorDescriptorFromRow(row);
+
+ if (dvDescriptor != null) {
+ // Pass DV descriptor as metadata - use Scala immutable HashMap
+ scala.collection.immutable.HashMap emptyHashMap =
+ new scala.collection.immutable.HashMap<>();
+ metadata =
+ (scala.collection.immutable.Map)
+ emptyHashMap.$plus(
+ new scala.Tuple2<>(DeltaParquetFileFormat.DV_DESCRIPTOR_KEY, dvDescriptor));
+ } else {
+ metadata = scala.collection.immutable.Map$.MODULE$.empty();
+ }
+
final PartitionedFile partitionedFile =
new PartitionedFile(
getPartitionRow(addFile.getPartitionValues()),
@@ -226,7 +251,7 @@ private void planScanFiles() {
locations,
addFile.getModificationTime(),
addFile.getSize(),
- otherConstantMetadataColumnValues);
+ metadata);
totalBytes += addFile.getSize();
partitionedFiles.add(partitionedFile);
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java
index 719c0f12259..b50b0f49e6c 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java
@@ -45,6 +45,7 @@ public class SparkScanBuilder
private final CaseInsensitiveStringMap options;
private final Set partitionColumnSet;
private StructType requiredDataSchema;
+ private final SnapshotImpl snapshot;
// pushedKernelPredicates: Predicates that have been pushed down to the Delta Kernel for
// evaluation.
// pushedSparkFilters: The same pushed predicates, but represented using Spark’s {@link Filter}
@@ -60,7 +61,8 @@ public SparkScanBuilder(
StructType partitionSchema,
SnapshotImpl snapshot,
CaseInsensitiveStringMap options) {
- this.kernelScanBuilder = requireNonNull(snapshot, "snapshot is null").getScanBuilder();
+ this.snapshot = requireNonNull(snapshot, "snapshot is null");
+ this.kernelScanBuilder = this.snapshot.getScanBuilder();
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.dataSchema = requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = requireNonNull(partitionSchema, "partitionSchema is null");
@@ -153,7 +155,8 @@ public org.apache.spark.sql.connector.read.Scan build() {
pushedKernelPredicates,
dataFilters,
kernelScanBuilder.build(),
- options);
+ options,
+ snapshot);
}
CaseInsensitiveStringMap getOptions() {
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ExpressionUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ExpressionUtils.java
index 7d9c3b283e4..c6f31f29bed 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ExpressionUtils.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ExpressionUtils.java
@@ -36,6 +36,7 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import scala.collection.JavaConverters;
+import scala.collection.Seq;
/**
* Utility class for converting Spark SQL filter expressions to Delta Kernel predicates.
@@ -64,7 +65,7 @@ public final class ExpressionUtils {
* not supported, along with a boolean indicating whether the conversion was partial
*/
public static ConvertedPredicate convertSparkFilterToKernelPredicate(Filter filter) {
- return convertSparkFilterToKernelPredicate(filter, true /*canPartialPushDown*/);
+ return convertSparkFilterToKernelPredicate(filter, true /* canPartialPushDown */);
}
/**
@@ -697,5 +698,113 @@ private static Optional dsv2ExpressionToCatalystExpression(
}
}
+ /**
+ * Convert Spark filters to use physical column names for Column Mapping support.
+ *
+ * This method recursively transforms all column references in filters from logical names to
+ * physical names based on the provided schema mapping. This is essential for Column Mapping
+ * feature where Parquet files use physical column names (e.g., "col-abc123" for ID mapping).
+ *
+ * @param filters The filters with logical column names
+ * @param logicalSchema The logical schema
+ * @param physicalSchema The physical schema (must have same structure as logical schema)
+ * @return Immutable Seq of filters with physical column names
+ */
+ public static scala.collection.immutable.Seq convertFiltersToPhysicalNames(
+ Seq filters, StructType logicalSchema, StructType physicalSchema) {
+
+ // Build logical -> physical column name mapping
+ Map logicalToPhysicalMap = new HashMap<>();
+ for (int i = 0; i < logicalSchema.fields().length; i++) {
+ String logicalName = logicalSchema.fields()[i].name();
+ String physicalName = physicalSchema.fields()[i].name();
+ logicalToPhysicalMap.put(logicalName, physicalName);
+ }
+
+ // Convert each filter
+ List convertedFilters = new ArrayList<>();
+ List filterList = JavaConverters.seqAsJavaList(filters);
+ for (Filter filter : filterList) {
+ convertedFilters.add(convertFilterToPhysicalNames(filter, logicalToPhysicalMap));
+ }
+
+ return JavaConverters.asScalaBuffer(convertedFilters).toSeq();
+ }
+
+ /**
+ * Recursively convert a single filter to use physical column names.
+ *
+ * Supports all common Spark filter types including comparison, logical, string, and null
+ * operations.
+ */
+ private static Filter convertFilterToPhysicalNames(
+ Filter filter, Map logicalToPhysicalMap) {
+ if (filter instanceof EqualTo) {
+ EqualTo eq = (EqualTo) filter;
+ return new EqualTo(getPhysicalColumnName(eq.attribute(), logicalToPhysicalMap), eq.value());
+ } else if (filter instanceof EqualNullSafe) {
+ EqualNullSafe eq = (EqualNullSafe) filter;
+ return new EqualNullSafe(
+ getPhysicalColumnName(eq.attribute(), logicalToPhysicalMap), eq.value());
+ } else if (filter instanceof GreaterThan) {
+ GreaterThan gt = (GreaterThan) filter;
+ return new GreaterThan(
+ getPhysicalColumnName(gt.attribute(), logicalToPhysicalMap), gt.value());
+ } else if (filter instanceof GreaterThanOrEqual) {
+ GreaterThanOrEqual gte = (GreaterThanOrEqual) filter;
+ return new GreaterThanOrEqual(
+ getPhysicalColumnName(gte.attribute(), logicalToPhysicalMap), gte.value());
+ } else if (filter instanceof LessThan) {
+ LessThan lt = (LessThan) filter;
+ return new LessThan(getPhysicalColumnName(lt.attribute(), logicalToPhysicalMap), lt.value());
+ } else if (filter instanceof LessThanOrEqual) {
+ LessThanOrEqual lte = (LessThanOrEqual) filter;
+ return new LessThanOrEqual(
+ getPhysicalColumnName(lte.attribute(), logicalToPhysicalMap), lte.value());
+ } else if (filter instanceof In) {
+ In in = (In) filter;
+ return new In(getPhysicalColumnName(in.attribute(), logicalToPhysicalMap), in.values());
+ } else if (filter instanceof IsNull) {
+ IsNull isNull = (IsNull) filter;
+ return new IsNull(getPhysicalColumnName(isNull.attribute(), logicalToPhysicalMap));
+ } else if (filter instanceof IsNotNull) {
+ IsNotNull isNotNull = (IsNotNull) filter;
+ return new IsNotNull(getPhysicalColumnName(isNotNull.attribute(), logicalToPhysicalMap));
+ } else if (filter instanceof StringStartsWith) {
+ StringStartsWith startsWith = (StringStartsWith) filter;
+ return new StringStartsWith(
+ getPhysicalColumnName(startsWith.attribute(), logicalToPhysicalMap), startsWith.value());
+ } else if (filter instanceof StringEndsWith) {
+ StringEndsWith endsWith = (StringEndsWith) filter;
+ return new StringEndsWith(
+ getPhysicalColumnName(endsWith.attribute(), logicalToPhysicalMap), endsWith.value());
+ } else if (filter instanceof StringContains) {
+ StringContains contains = (StringContains) filter;
+ return new StringContains(
+ getPhysicalColumnName(contains.attribute(), logicalToPhysicalMap), contains.value());
+ } else if (filter instanceof org.apache.spark.sql.sources.And) {
+ org.apache.spark.sql.sources.And and = (org.apache.spark.sql.sources.And) filter;
+ return new org.apache.spark.sql.sources.And(
+ convertFilterToPhysicalNames(and.left(), logicalToPhysicalMap),
+ convertFilterToPhysicalNames(and.right(), logicalToPhysicalMap));
+ } else if (filter instanceof org.apache.spark.sql.sources.Or) {
+ org.apache.spark.sql.sources.Or or = (org.apache.spark.sql.sources.Or) filter;
+ return new org.apache.spark.sql.sources.Or(
+ convertFilterToPhysicalNames(or.left(), logicalToPhysicalMap),
+ convertFilterToPhysicalNames(or.right(), logicalToPhysicalMap));
+ } else if (filter instanceof Not) {
+ Not not = (Not) filter;
+ return new Not(convertFilterToPhysicalNames(not.child(), logicalToPhysicalMap));
+ } else {
+ // Unknown filter type, return as-is
+ return filter;
+ }
+ }
+
+ /** Get physical column name from logical name, fallback to logical name if not found. */
+ private static String getPhysicalColumnName(String logicalName, Map mapping) {
+ return mapping.getOrDefault(logicalName, logicalName);
+ }
+
private ExpressionUtils() {}
}
diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java
index 68561aac93c..43e5b7939ce 100644
--- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java
+++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java
@@ -603,10 +603,7 @@ public void testAllGoldenTables() {
"deltalog-state-reconstruction-from-checkpoint-missing-metadata",
// [DELTA_STATE_RECOVER_ERROR] The protocol of your Delta table could not be recovered
// while Reconstructing
- "deltalog-state-reconstruction-from-checkpoint-missing-protocol",
- // Answer mismatch
- "dv-partitioned-with-checkpoint",
- "dv-with-columnmapping");
+ "deltalog-state-reconstruction-from-checkpoint-missing-protocol");
for (String tableName : tableNames) {
if (unsupportedTables.contains(tableName)) {