-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GitOrigin-RevId: 1a371c60129b789b92f494a86e164e2dd18da03d
- Loading branch information
1 parent
7ee05d1
commit c08a48c
Showing
96 changed files
with
764 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
182 changes: 182 additions & 0 deletions
182
core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta | ||
|
||
import java.net.URI | ||
|
||
import org.apache.spark.sql.delta.RowIndexFilter | ||
import org.apache.spark.sql.delta.DeltaParquetFileFormat._ | ||
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor | ||
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable | ||
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} | ||
import org.apache.spark.sql.delta.sources.DeltaSQLConf | ||
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath | ||
|
||
import org.apache.spark.broadcast.Broadcast | ||
import org.apache.spark.sql.{Column, SparkSession} | ||
import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral | ||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.util.SerializableConfiguration | ||
|
||
/** | ||
* Plan transformer to inject a filter that removes the rows marked as deleted according to | ||
* deletion vectors. For tables with no deletion vectors, this transformation has no effect. | ||
* | ||
* It modifies for plan for tables with deletion vectors as follows: | ||
* Before rule: <Parent Node> -> Delta Scan (key, value). | ||
* - Here we are reading `key`, `value`` columns from the Delta table | ||
* After rule: | ||
* <Parent Node> -> | ||
* Project(key, value) -> | ||
* Filter (udf(__skip_row == 0) -> | ||
* Delta Scan (key, value, __skip_row) | ||
* - Here we insert a new column `__skip_row` in Delta scan. This value is populated by the | ||
* Parquet reader using the DV corresponding to the Parquet file read | ||
* (See [[DeltaParquetFileFormat]]) and it contains 0 if we want to keep the row. | ||
* The scan created also disables Parquet file splitting and filter pushdowns, because | ||
* in order to generate the __skip_row, we need to read the rows in a file consecutively | ||
* to generate the row index. This is a cost we need to pay until we upgrade to latest | ||
* Apache Spark which contains Parquet reader changes that automatically generate the | ||
* row_index irrespective of the file splitting and filter pushdowns. | ||
* - The scan created also contains a broadcast variable of Parquet File -> DV File map. | ||
* The Parquet reader created uses this map to find the DV file corresponding to the data file. | ||
* - Filter created filters out rows with __skip_row equals to 0 | ||
* - And at the end we have a Project to keep the plan node output same as before the rule is | ||
* applied. | ||
*/ | ||
trait PreprocessTableWithDVs extends SubqueryTransformerHelper { | ||
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = { | ||
transformWithSubqueries(plan) { | ||
case ScanWithDeletionVectors(dvScan) => dvScan | ||
} | ||
} | ||
} | ||
|
||
object ScanWithDeletionVectors { | ||
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match { | ||
case scan @ LogicalRelation( | ||
relation @ HadoopFsRelation( | ||
index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) => | ||
dvEnabledScanFor(scan, relation, format, index) | ||
case _ => None | ||
} | ||
|
||
def dvEnabledScanFor( | ||
scan: LogicalRelation, | ||
hadoopRelation: HadoopFsRelation, | ||
fileFormat: DeltaParquetFileFormat, | ||
index: TahoeFileIndex): Option[LogicalPlan] = { | ||
// If the table has no DVs enabled, no change needed | ||
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None | ||
|
||
require(!index.isInstanceOf[TahoeLogFileIndex], | ||
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex") | ||
|
||
// If the table has no DVs enabled, no change needed | ||
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None | ||
|
||
// See if the relation is already modified to include DV reads as part of | ||
// a previous invocation of this rule on this table | ||
if (fileFormat.hasDeletionVectorMap()) return None | ||
|
||
// See if any files actually have a DV | ||
val spark = SparkSession.getActiveSession.get | ||
val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index) | ||
if (filePathToDVBroadcastMap.value.isEmpty) return None | ||
|
||
// Get the list of columns in the output of the `LogicalRelation` we are | ||
// trying to modify. At the end of the plan, we need to return a | ||
// `LogicalRelation` that has the same output as this `LogicalRelation` | ||
val planOutput = scan.output | ||
|
||
val newScan = createScanWithSkipRowColumn( | ||
spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation) | ||
|
||
// On top of the scan add a filter that filters out the rows which have | ||
// skip row column value non-zero | ||
val rowIndexFilter = createRowIndexFilterNode(newScan) | ||
|
||
// Now add a project on top of the row index filter node to | ||
// remove the skip row column | ||
Some(Project(planOutput, rowIndexFilter)) | ||
} | ||
/** | ||
* Helper method that creates a new `LogicalRelation` for existing scan that outputs | ||
* an extra column which indicates whether the row needs to be skipped or not. | ||
*/ | ||
private def createScanWithSkipRowColumn( | ||
spark: SparkSession, | ||
inputScan: LogicalRelation, | ||
fileFormat: DeltaParquetFileFormat, | ||
tahoeFileIndex: TahoeFileIndex, | ||
filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptor]], | ||
hadoopFsRelation: HadoopFsRelation): LogicalRelation = { | ||
// Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra | ||
// column to indicate whether to skip the row or not | ||
|
||
// Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any | ||
// other values mean the row needs be skipped. | ||
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD | ||
val newScanOutput = inputScan.output :+ | ||
AttributeReference(skipRowField.name, skipRowField.dataType)() | ||
val newScanSchema = StructType(inputScan.schema).add(skipRowField) | ||
|
||
val hadoopConfBroadcast = spark.sparkContext.broadcast( | ||
new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf())) | ||
|
||
val newFileFormat = fileFormat.copyWithDVInfo( | ||
tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast) | ||
val newRelation = hadoopFsRelation.copy( | ||
fileFormat = newFileFormat, | ||
dataSchema = newScanSchema)(hadoopFsRelation.sparkSession) | ||
|
||
// Create a new scan LogicalRelation | ||
inputScan.copy(relation = newRelation, output = newScanOutput) | ||
} | ||
|
||
private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = { | ||
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME) | ||
require(skipRowColumnRefs.size == 1, | ||
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME") | ||
val skipRowColumnRef = skipRowColumnRefs.head | ||
|
||
val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE) | ||
.asNondeterministic() // To avoid constant folding the filter based on stats. | ||
|
||
val filterExp = keepRow(new Column(skipRowColumnRef)).expr | ||
Filter(filterExp, newScan) | ||
} | ||
|
||
private def createBroadcastDVMap( | ||
spark: SparkSession, | ||
tahoeFileIndex: TahoeFileIndex): Broadcast[Map[URI, DeletionVectorDescriptor]] = { | ||
// Given there is no way to find the final filters, just select all files in the | ||
// file index and create the DV map. | ||
val filesWithDVs = | ||
tahoeFileIndex.matchingFiles(Seq(TrueLiteral), Seq(TrueLiteral)) | ||
.filter(_.deletionVector != null) | ||
val filePathToDVMap = filesWithDVs | ||
.map(x => | ||
absolutePath(tahoeFileIndex.path.toString, x.path).toUri -> x.deletionVector) | ||
.toMap | ||
spark.sparkContext.broadcast(filePathToDVMap) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
core/src/main/scala/org/apache/spark/sql/delta/SubqueryTransformerHelper.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta | ||
|
||
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, SupportsSubquery} | ||
|
||
/** | ||
* Trait to allow processing '''all''' instances of a certain node in a subquery. | ||
* | ||
* Pattern matching in transform cannot be used because of the short-circuiting | ||
* nature of the pattern matching. It stops matching after one instance of | ||
* the certain node is found and remaining nodes in the subquery plan will | ||
* not be transformed. | ||
*/ | ||
trait SubqueryTransformerHelper { | ||
|
||
/** | ||
* Transform all nodes matched by the rule in the query plan rooted at given `plan`. | ||
* It requires that the given plan already gone through [[OptimizeSubqueries]] and the | ||
* root node denoting a subquery is removed and optimized appropriately. | ||
*/ | ||
def transformWithSubqueries(plan: LogicalPlan) | ||
(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
require(!isSubqueryRoot(plan)) | ||
transformSubqueries(plan, rule) transform (rule) | ||
} | ||
|
||
/** Is the give plan a subquery root. */ | ||
def isSubqueryRoot(plan: LogicalPlan): Boolean = { | ||
plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery] | ||
} | ||
|
||
private def transformSubqueries( | ||
plan: LogicalPlan, | ||
rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
import org.apache.spark.sql.delta.implicits._ | ||
|
||
plan transformAllExpressionsUp { | ||
case subquery: SubqueryExpression => | ||
subquery.withNewPlan(transformWithSubqueries(subquery.plan)(rule)) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.