Skip to content

Commit

Permalink
Add RowIndexFilter interface and implementation
Browse files Browse the repository at this point in the history
This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at delta-io#1485)

It adds an interface called `RowIndexFilter` which evaluates whether to keep a row in the output or not. `FilterDeletedRows` implements `RowIndexFilter` to filter out rows that are deleted from a deletion vector. In the final integration, this filter is used just after fetching the rows from the data parquet file. Refer to task IDs 7 and 8 in the [project plan.](delta-io#1485)

Test suite is added.

GitOrigin-RevId: 0ba60f880f7a83304142f4b021fd71b170d74356
  • Loading branch information
vkorukanti committed Dec 29, 2022
1 parent 3a14689 commit 242b837
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
45 changes: 45 additions & 0 deletions core/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta;

import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
* Provides filtering information for each row index within given range.
* Specific filters are implemented in subclasses.
*/
public interface RowIndexFilter {

/**
* Materialize filtering information for all rows in the range [start, end)
* by filling a boolean column vector batch.
*
* @param start Beginning row index of the filtering range (inclusive)
* @param end End row index of the filtering range (exclusive)
* @param batch The column vector for the current batch to materialize the range into
*/
void materializeIntoVector(long start, long end, WritableColumnVector batch);

/**
* Value that must be materialised for a row to be kept after filtering.
*/
public static final byte KEEP_ROW_VALUE = 0;
/**
* Value that must be materialised for a row to be dropped during filtering.
*/
public static final byte DROP_ROW_VALUE = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.deletionvectors

import org.apache.spark.sql.delta.RowIndexFilter
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.execution.vectorized.WritableColumnVector

/**
* Implementation of [[RowIndexFilter]] which checks, for a given row index and deletion vector,
* whether the row index is present in the deletion vector.If present, the row is marked for
* skipping.
* @param bitmap Represents the deletion vector
*/
final class DeletedRowsMarkingFilter(bitmap: RoaringBitmapArray) extends RowIndexFilter {

override def materializeIntoVector(start: Long, end: Long, batch: WritableColumnVector): Unit = {
val batchSize = (end - start).toInt
var rowId = 0
while (rowId < batchSize) {
val isContained = bitmap.contains(start + rowId.toLong)
val filterOutput = if (isContained) {
RowIndexFilter.DROP_ROW_VALUE
} else {
RowIndexFilter.KEEP_ROW_VALUE
}
batch.putByte(rowId, filterOutput)
rowId += 1
}
}
}

object DeletedRowsMarkingFilter {
/**
* Utility method that creates [[RowIndexFilter]] to filter out row indices that
* are present in the given deletion vector.
*/
def createInstance(
deletionVector: DeletionVectorDescriptor,
hadoopConf: Configuration,
tablePath: Option[Path]): RowIndexFilter = {
if (deletionVector.cardinality == 0) {
// no rows are deleted according to the deletion vector, create a constant row index filter
// that keeps all rows
new KeepAllRowsFilter
} else {
val dvStore = DeletionVectorStore.createInstance(hadoopConf)
val storedBitmap = new StoredBitmap(deletionVector, tablePath)
val bitmap = storedBitmap.load(dvStore)
new DeletedRowsMarkingFilter(bitmap)
}
}

private class KeepAllRowsFilter extends RowIndexFilter {
override def materializeIntoVector(
start: Long, end: Long, batch: WritableColumnVector): Unit = {
val batchSize = (end - start).toInt
var rowId = 0
while (rowId < batchSize) {
batch.putByte(rowId, RowIndexFilter.KEEP_ROW_VALUE)
rowId += 1
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.deletionvectors

import org.apache.spark.sql.delta.RowIndexFilter
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor._
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore._
import org.apache.spark.sql.delta.util.PathWithFileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.ByteType
import org.apache.spark.util.Utils

class DeletedRowsMarkingFilterSuite extends QueryTest with SharedSparkSession {

test("empty deletion vector") {
val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(
DeletionVectorDescriptor.EMPTY,
newHadoopConf,
tablePath = None)

assert(eval(rowIndexFilter, start = 0, end = 20) === Seq.empty)
assert(eval(rowIndexFilter, start = 20, end = 200) === Seq.empty)
assert(eval(rowIndexFilter, start = 200, end = 2000) === Seq.empty)
}

Seq(true, false).foreach { isInline =>
test(s"deletion vector single row deleted - isInline=$isInline") {
withTempDir { tableDir =>
val tablePath = stringToPath(tableDir.toString)
val dv = createDV(isInline, tablePath, 25)

val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(dv, newHadoopConf, Some(tablePath))

assert(eval(rowIndexFilter, start = 0, end = 20) === Seq.empty)
assert(eval(rowIndexFilter, start = 20, end = 35) === Seq(25))
assert(eval(rowIndexFilter, start = 35, end = 325) === Seq.empty)
}
}
}

Seq(true, false).foreach { isInline =>
test(s"deletion vector with multiple rows deleted - isInline=$isInline") {
withTempDir { tableDir =>
val tablePath = stringToPath(tableDir.toString)
val dv = createDV(isInline, tablePath, 0, 25, 35, 2000, 50000)

val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(dv, newHadoopConf, Some(tablePath))

assert(eval(rowIndexFilter, start = 0, end = 20) === Seq(0))
assert(eval(rowIndexFilter, start = 20, end = 35) === Seq(25))
assert(eval(rowIndexFilter, start = 35, end = 325) === Seq(35))
assert(eval(rowIndexFilter, start = 325, end = 1000) === Seq.empty)
assert(eval(rowIndexFilter, start = 1000, end = 60000) === Seq(2000, 50000))
assert(eval(rowIndexFilter, start = 60000, end = 800000) === Seq.empty)
}
}
}

private def newBatch(capacity: Int): WritableColumnVector =
new OnHeapColumnVector(capacity, ByteType)

protected def newHadoopConf: Configuration = {
// scalastyle:off deltahadoopconfiguration
spark.sessionState.newHadoopConf()
// scalastyle:on deltahadoopconfiguration
}

/**
* Helper method that creates DV with the given deleted row ids and returns
* a [[DeletionVectorDescriptor]]. DV created can be an in-line or on disk
*/
protected def createDV(
isInline: Boolean, tablePath: Path, deletedRows: Long*): DeletionVectorDescriptor = {
val bitmap = RoaringBitmapArray(deletedRows: _*)
val serializedBitmap = bitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
val cardinality = deletedRows.size
if (isInline) {
inlineInLog(serializedBitmap, cardinality)
} else {
val tableWithFS = PathWithFileSystem.withConf(tablePath, newHadoopConf).makeQualified()
val dvPath = dvStore.generateUniqueNameInTable(tableWithFS)
val dvRange = Utils.tryWithResource(dvStore.createWriter(dvPath)) { writer =>
writer.write(serializedBitmap)
}
onDiskWithAbsolutePath(
pathToString(dvPath.path), dvRange.length, cardinality, Some(dvRange.offset))
}
}

/** Evaluate the given row index filter instance and return sequence of dropped row indexes */
protected def eval(rowIndexFilter: RowIndexFilter, start: Long, end: Long): Seq[Long] = {
val batchSize = (end - start + 1).toInt
val batch = newBatch(batchSize)
rowIndexFilter.materializeIntoVector(start, end, batch)
batch.getBytes(0, batchSize).toSeq
.zip(Seq.range(start, end))
.filter(_._1 == RowIndexFilter.DROP_ROW_VALUE) // filter out dropped rows
.map(_._2) // select only the row id
.toSeq
}

lazy val dvStore: DeletionVectorStore = DeletionVectorStore.createInstance(newHadoopConf)
}

0 comments on commit 242b837

Please sign in to comment.