Skip to content

Commit

Permalink
Merge pull request #239 from osopardo1/235-add-delta-data-skipping
Browse files Browse the repository at this point in the history
Add Delta Data Skipping on Staging Area
  • Loading branch information
osopardo1 authored Nov 30, 2023
2 parents f6b65a8 + 681518e commit 4b32021
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 85 deletions.
13 changes: 11 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ private[spark] trait DeltaStagingUtils extends StagingUtils {
/**
* Condition for Staging AddFiles in the form of Spark sql Column
*/
private val isStagingFile: Column =
private val isStagingFileColumn: Column =
col("tags").isNull.or(col("tags.revision") === lit(stagingID.toString))

/**
* Extract current staging files from the snapshot
* @return
*/
protected def stagingFiles(): Dataset[AddFile] = {
snapshot.allFiles.where(isStagingFile)
snapshot.allFiles.where(isStagingFileColumn)
}

/**
* Wether an AddFile is a staging file or not
* @param a the AddFile
* @return boolean
*/
def isStagingFile(a: AddFile): Boolean =
a.tags == null || a.tags.isEmpty || a.tags
.getOrElse("revision", "") == stagingID.toString

}
72 changes: 50 additions & 22 deletions src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow}
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.types.StructType
Expand All @@ -24,7 +23,10 @@ import java.net.URI
* @param index the Tahoe log file index
* @param spark spark session
*/
case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging {
case class OTreeIndex(index: TahoeLogFileIndex)
extends FileIndex
with DeltaStagingUtils
with Logging {

/**
* Snapshot to analyze
Expand Down Expand Up @@ -53,45 +55,71 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging {
}

/**
* Collect Staging AddFiles from _delta_log and convert them into FileStatuses.
* The output is merged with those built from QbeastBlocks.
* Collect matching QbeastBlocks and convert them into FileStatuses.
* @param partitionFilters
* @param dataFilters
* @return
*/
private def stagingFiles: Seq[FileStatus] = {
qbeastSnapshot.loadStagingBlocks().collect().map { a: AddFile =>
new FileStatus(
/* length */ a.size,
/* isDir */ false,
/* blockReplication */ 0,
/* blockSize */ 1,
/* modificationTime */ a.modificationTime,
absolutePath(a.path))
}
}

override def listFiles(
private def qbeastMatchingFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val qbeastFileStats = matchingBlocks(partitionFilters, dataFilters)
dataFilters: Seq[Expression]): Seq[FileStatus] = {
matchingBlocks(partitionFilters, dataFilters)
.map(block => (block.file))
.map(file => (file.path, file))
.toMap
.values
.map(IndexFiles.toFileStatus(index.path))
.toArray
val stagingStats = stagingFiles
val fileStats = qbeastFileStats ++ stagingStats
.toSeq
}

/**
* Collect matching staging files from _delta_log and convert them into FileStatuses.
* The output is merged with those built from QbeastBlocks.
* @return
*/
private def stagingFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[FileStatus] = {

index
.matchingFiles(partitionFilters, dataFilters)
.filter(isStagingFile)
.map { f =>
new FileStatus(
/* length */ f.size,
/* isDir */ false,
/* blockReplication */ 0,
/* blockSize */ 1,
/* modificationTime */ f.modificationTime,
absolutePath(f.path))
}
}

override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {

// FILTER FILES FROM QBEAST
val qbeastFileStats = qbeastMatchingFiles(partitionFilters, dataFilters)
// FILTER FILES FROM DELTA
val stagingFileStats = stagingFiles(partitionFilters, dataFilters)
// JOIN BOTH FILTERED FILES
val fileStats = qbeastFileStats ++ stagingFileStats

val sc = index.spark.sparkContext
val execId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val pfStr = partitionFilters.map(f => f.toString).mkString(" ")
logInfo(s"OTreeIndex partition filters (exec id ${execId}): ${pfStr}")
val dfStr = dataFilters.map(f => f.toString).mkString(" ")
logInfo(s"OTreeIndex data filters (exec id ${execId}): ${dfStr}")

val allFilesCount = snapshot.allFiles.count
val nFiltered = allFilesCount - fileStats.length
val filteredPct = ((nFiltered * 1.0) / allFilesCount) * 100.0
val filteredMsg = f"${nFiltered} of ${allFilesCount} (${filteredPct}%.2f%%)"
logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}")

// RETURN
Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats))
}

Expand Down
70 changes: 70 additions & 0 deletions src/test/scala/io/qbeast/TestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.qbeast

import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.delta.OTreeIndex
import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.FileSourceScanExec

object TestUtils extends QbeastIntegrationTestSpec {

def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = {
val leaves = query.queryExecution.sparkPlan.collectLeaves()

val dataFilters = leaves
.collectFirst {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
f.dataFilters.filterNot(_.isInstanceOf[QbeastMurmur3Hash])
}
.getOrElse(Seq.empty)

val dataFiltersSql = dataFilters.map(_.sql)
sqlFilters.foreach(filter => dataFiltersSql should contain(filter))
}

def checkFiltersArePushedDown(query: DataFrame): Unit = {
val leaves =
query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec])

leaves should not be empty

leaves.exists(p =>
p
.asInstanceOf[FileSourceScanExec]
.relation
.location
.isInstanceOf[OTreeIndex]) shouldBe true

leaves
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
f.dataFilters.nonEmpty shouldBe true
}
}

def checkFileFiltering(query: DataFrame): Unit = {
val leaves =
query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec])

leaves should not be empty

leaves.exists(p =>
p
.asInstanceOf[FileSourceScanExec]
.relation
.location
.isInstanceOf[OTreeIndex]) shouldBe true

leaves
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
val index = f.relation.location
val matchingFiles =
index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files)
val allFiles = index.listFiles(Seq.empty, Seq.empty).flatMap(_.files)
matchingFiles.length shouldBe <(allFiles.length)
}

}

}
30 changes: 30 additions & 0 deletions src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package io.qbeast.spark.delta
import io.qbeast.TestClasses.T2
import io.qbeast.core.model.Block
import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.internal.commands.ConvertToQbeastCommand
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.functions.expr

class OTreeIndexTest extends QbeastIntegrationTestSpec {

Expand Down Expand Up @@ -128,4 +130,32 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec {
oTreeIndex.sizeInBytes shouldBe sizeInBytes
})

it should "filter files with underlying data skipping" in withSparkAndTmpDir(
(spark, tmpdir) => {

import spark.implicits._
val source = Seq(1, 2, 3, 4).toDF("id")

source
.coalesce(4)
.write
.format("delta")
.save(tmpdir)

// CONVERT TO QBEAST
ConvertToQbeastCommand.apply(tmpdir, Seq("id"), 1000)

val deltaLog = DeltaLog.forTable(spark, tmpdir)
val snapshot = deltaLog.update()
val tahoeFileIndex =
TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, snapshot, Seq.empty, false)
val oTreeIndex = new OTreeIndexTest(tahoeFileIndex)

val allFiles = snapshot.allFiles
val filteredFiles = oTreeIndex.listFiles(Seq.empty, Seq(expr("id == 1").expr))

allFiles.count() shouldBe 4
filteredFiles.size shouldBe 1
})

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.qbeast.spark.utils

import io.qbeast.TestUtils._
import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.delta.OTreeIndex
import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions.{avg, col, rand, regexp_replace, when}

Expand All @@ -16,40 +15,6 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
private val filter_user_equal = "(user_id = 536764969)"
private val filter_product_equal = "(product_id = 11522682)"

private def checkFiltersArePushedDown(query: DataFrame): Unit = {
val leaves =
query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec])

leaves should not be empty

leaves.exists(p =>
p
.asInstanceOf[FileSourceScanExec]
.relation
.location
.isInstanceOf[OTreeIndex]) shouldBe true

leaves
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
f.dataFilters.nonEmpty shouldBe true
}
}

private def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = {
val leaves = query.queryExecution.sparkPlan.collectLeaves()

val dataFilters = leaves
.collectFirst {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
f.dataFilters.filterNot(_.isInstanceOf[QbeastMurmur3Hash])
}
.getOrElse(Seq.empty)

val dataFiltersSql = dataFilters.map(_.sql)
sqlFilters.foreach(filter => dataFiltersSql should contain(filter))
}

"Qbeast" should
"return a valid filtering of the original dataset " +
"for one column" in withSparkAndTmpDir { (spark, tmpDir) =>
Expand Down
27 changes: 2 additions & 25 deletions src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala
Original file line number Diff line number Diff line change
@@ -1,34 +1,11 @@
package io.qbeast.spark.utils

import io.qbeast.spark.delta.OTreeIndex
import io.qbeast.TestUtils._
import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SparkSession

class QbeastSamplingTest extends QbeastIntegrationTestSpec {

private def checkFileFiltering(query: DataFrame): Unit = {
val leaves = query.queryExecution.executedPlan.collectLeaves()

leaves.exists(p =>
p
.asInstanceOf[FileSourceScanExec]
.relation
.location
.isInstanceOf[OTreeIndex]) shouldBe true

leaves
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
val index = f.relation.location
val matchingFiles =
index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files)
val allFiles = index.inputFiles
matchingFiles.length shouldBe <(allFiles.length)
}

}

"Qbeast" should
"return a valid sample of the original dataset" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
Expand Down

0 comments on commit 4b32021

Please sign in to comment.