From 242b837c266d10c1429dd47564bc5ed8ca959e30 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Thu, 29 Dec 2022 01:57:53 -0800 Subject: [PATCH] Add `RowIndexFilter` interface and implementation This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at https://github.com/delta-io/delta/issues/1485) It adds an interface called `RowIndexFilter` which evaluates whether to keep a row in the output or not. `FilterDeletedRows` implements `RowIndexFilter` to filter out rows that are deleted from a deletion vector. In the final integration, this filter is used just after fetching the rows from the data parquet file. Refer to task IDs 7 and 8 in the [project plan.](https://github.com/delta-io/delta/issues/1485) Test suite is added. GitOrigin-RevId: 0ba60f880f7a83304142f4b021fd71b170d74356 --- .../spark/sql/delta/RowIndexFilter.java | 45 +++++++ .../DeletedRowsMarkingFilter.scala | 83 ++++++++++++ .../DeletedRowsMarkingFilterSuite.scala | 124 ++++++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilter.scala create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilterSuite.scala diff --git a/core/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java b/core/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java new file mode 100644 index 00000000000..5fdfbfb2d49 --- /dev/null +++ b/core/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java @@ -0,0 +1,45 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta; + +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +/** + * Provides filtering information for each row index within given range. + * Specific filters are implemented in subclasses. + */ +public interface RowIndexFilter { + + /** + * Materialize filtering information for all rows in the range [start, end) + * by filling a boolean column vector batch. + * + * @param start Beginning row index of the filtering range (inclusive) + * @param end End row index of the filtering range (exclusive) + * @param batch The column vector for the current batch to materialize the range into + */ + void materializeIntoVector(long start, long end, WritableColumnVector batch); + + /** + * Value that must be materialised for a row to be kept after filtering. + */ + public static final byte KEEP_ROW_VALUE = 0; + /** + * Value that must be materialised for a row to be dropped during filtering. + */ + public static final byte DROP_ROW_VALUE = 1; +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilter.scala b/core/src/main/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilter.scala new file mode 100644 index 00000000000..7473f564059 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilter.scala @@ -0,0 +1,83 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.deletionvectors + +import org.apache.spark.sql.delta.RowIndexFilter +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.vectorized.WritableColumnVector + +/** + * Implementation of [[RowIndexFilter]] which checks, for a given row index and deletion vector, + * whether the row index is present in the deletion vector.If present, the row is marked for + * skipping. + * @param bitmap Represents the deletion vector + */ +final class DeletedRowsMarkingFilter(bitmap: RoaringBitmapArray) extends RowIndexFilter { + + override def materializeIntoVector(start: Long, end: Long, batch: WritableColumnVector): Unit = { + val batchSize = (end - start).toInt + var rowId = 0 + while (rowId < batchSize) { + val isContained = bitmap.contains(start + rowId.toLong) + val filterOutput = if (isContained) { + RowIndexFilter.DROP_ROW_VALUE + } else { + RowIndexFilter.KEEP_ROW_VALUE + } + batch.putByte(rowId, filterOutput) + rowId += 1 + } + } +} + +object DeletedRowsMarkingFilter { + /** + * Utility method that creates [[RowIndexFilter]] to filter out row indices that + * are present in the given deletion vector. + */ + def createInstance( + deletionVector: DeletionVectorDescriptor, + hadoopConf: Configuration, + tablePath: Option[Path]): RowIndexFilter = { + if (deletionVector.cardinality == 0) { + // no rows are deleted according to the deletion vector, create a constant row index filter + // that keeps all rows + new KeepAllRowsFilter + } else { + val dvStore = DeletionVectorStore.createInstance(hadoopConf) + val storedBitmap = new StoredBitmap(deletionVector, tablePath) + val bitmap = storedBitmap.load(dvStore) + new DeletedRowsMarkingFilter(bitmap) + } + } + + private class KeepAllRowsFilter extends RowIndexFilter { + override def materializeIntoVector( + start: Long, end: Long, batch: WritableColumnVector): Unit = { + val batchSize = (end - start).toInt + var rowId = 0 + while (rowId < batchSize) { + batch.putByte(rowId, RowIndexFilter.KEEP_ROW_VALUE) + rowId += 1 + } + } + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilterSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilterSuite.scala new file mode 100644 index 00000000000..5017d46e43f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletedRowsMarkingFilterSuite.scala @@ -0,0 +1,124 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.deletionvectors + +import org.apache.spark.sql.delta.RowIndexFilter +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor._ +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore._ +import org.apache.spark.sql.delta.util.PathWithFileSystem +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.ByteType +import org.apache.spark.util.Utils + +class DeletedRowsMarkingFilterSuite extends QueryTest with SharedSparkSession { + + test("empty deletion vector") { + val rowIndexFilter = DeletedRowsMarkingFilter.createInstance( + DeletionVectorDescriptor.EMPTY, + newHadoopConf, + tablePath = None) + + assert(eval(rowIndexFilter, start = 0, end = 20) === Seq.empty) + assert(eval(rowIndexFilter, start = 20, end = 200) === Seq.empty) + assert(eval(rowIndexFilter, start = 200, end = 2000) === Seq.empty) + } + + Seq(true, false).foreach { isInline => + test(s"deletion vector single row deleted - isInline=$isInline") { + withTempDir { tableDir => + val tablePath = stringToPath(tableDir.toString) + val dv = createDV(isInline, tablePath, 25) + + val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(dv, newHadoopConf, Some(tablePath)) + + assert(eval(rowIndexFilter, start = 0, end = 20) === Seq.empty) + assert(eval(rowIndexFilter, start = 20, end = 35) === Seq(25)) + assert(eval(rowIndexFilter, start = 35, end = 325) === Seq.empty) + } + } + } + + Seq(true, false).foreach { isInline => + test(s"deletion vector with multiple rows deleted - isInline=$isInline") { + withTempDir { tableDir => + val tablePath = stringToPath(tableDir.toString) + val dv = createDV(isInline, tablePath, 0, 25, 35, 2000, 50000) + + val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(dv, newHadoopConf, Some(tablePath)) + + assert(eval(rowIndexFilter, start = 0, end = 20) === Seq(0)) + assert(eval(rowIndexFilter, start = 20, end = 35) === Seq(25)) + assert(eval(rowIndexFilter, start = 35, end = 325) === Seq(35)) + assert(eval(rowIndexFilter, start = 325, end = 1000) === Seq.empty) + assert(eval(rowIndexFilter, start = 1000, end = 60000) === Seq(2000, 50000)) + assert(eval(rowIndexFilter, start = 60000, end = 800000) === Seq.empty) + } + } + } + + private def newBatch(capacity: Int): WritableColumnVector = + new OnHeapColumnVector(capacity, ByteType) + + protected def newHadoopConf: Configuration = { + // scalastyle:off deltahadoopconfiguration + spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + } + + /** + * Helper method that creates DV with the given deleted row ids and returns + * a [[DeletionVectorDescriptor]]. DV created can be an in-line or on disk + */ + protected def createDV( + isInline: Boolean, tablePath: Path, deletedRows: Long*): DeletionVectorDescriptor = { + val bitmap = RoaringBitmapArray(deletedRows: _*) + val serializedBitmap = bitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + val cardinality = deletedRows.size + if (isInline) { + inlineInLog(serializedBitmap, cardinality) + } else { + val tableWithFS = PathWithFileSystem.withConf(tablePath, newHadoopConf).makeQualified() + val dvPath = dvStore.generateUniqueNameInTable(tableWithFS) + val dvRange = Utils.tryWithResource(dvStore.createWriter(dvPath)) { writer => + writer.write(serializedBitmap) + } + onDiskWithAbsolutePath( + pathToString(dvPath.path), dvRange.length, cardinality, Some(dvRange.offset)) + } + } + + /** Evaluate the given row index filter instance and return sequence of dropped row indexes */ + protected def eval(rowIndexFilter: RowIndexFilter, start: Long, end: Long): Seq[Long] = { + val batchSize = (end - start + 1).toInt + val batch = newBatch(batchSize) + rowIndexFilter.materializeIntoVector(start, end, batch) + batch.getBytes(0, batchSize).toSeq + .zip(Seq.range(start, end)) + .filter(_._1 == RowIndexFilter.DROP_ROW_VALUE) // filter out dropped rows + .map(_._2) // select only the row id + .toSeq + } + + lazy val dvStore: DeletionVectorStore = DeletionVectorStore.createInstance(newHadoopConf) +}