From f7ce934813a86864273d24c346deb53c85a5781f Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 27 Nov 2023 16:27:42 +0100 Subject: [PATCH 01/11] WIP on delta data skipping --- .../io/qbeast/spark/delta/OTreeIndex.scala | 55 ++++++++++++------- .../qbeast/spark/delta/OTreeIndexTest.scala | 25 +++++++++ 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 5425a3f4a..072c72b50 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -9,7 +9,6 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow} import org.apache.spark.sql.delta.{DeltaLog, Snapshot} -import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.types.StructType @@ -34,6 +33,8 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { private def qbeastSnapshot = DeltaQbeastSnapshot(snapshot) + private lazy val spark = index.spark + protected def absolutePath(child: String): Path = { val p = new Path(new URI(child)) if (p.isAbsolute) { @@ -53,46 +54,60 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { } /** - * Collect Staging AddFiles from _delta_log and convert them into FileStatuses. + * Collect AddFiles from _delta_log and convert them into FileStatuses. * The output is merged with those built from QbeastBlocks. * @return */ - private def stagingFiles: Seq[FileStatus] = { - qbeastSnapshot.loadStagingBlocks().collect().map { a: AddFile => + private def deltaMatchingFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[FileStatus] = { + + index.matchingFiles(partitionFilters, dataFilters).map { f => new FileStatus( - /* length */ a.size, + /* length */ f.size, /* isDir */ false, /* blockReplication */ 0, /* blockSize */ 1, - /* modificationTime */ a.modificationTime, - absolutePath(a.path)) + /* modificationTime */ f.modificationTime, + absolutePath(f.path)) } } override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val qbeastFileStats = matchingBlocks(partitionFilters, dataFilters) - .map(block => (block.file)) - .map(file => (file.path, file)) - .toMap - .values - .map(IndexFiles.toFileStatus(index.path)) - .toArray - val stagingStats = stagingFiles - val fileStats = qbeastFileStats ++ stagingStats - val sc = index.spark.sparkContext - val execId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + + // FILTER FILES FROM QBEAST + val qbeastFileStats = matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => + new FileStatus( + /* length */ qbeastBlock.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ qbeastBlock.modificationTime, + absolutePath(qbeastBlock.path)) + }.toArray + + // FILTER FILES FROM DELTA + val deltaFileStats = deltaMatchingFiles(partitionFilters, dataFilters) + + // JOIN BOTH FILTERED FILES + val fileStatsSet = (qbeastFileStats ++ deltaFileStats).toSet + + val execId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val pfStr = partitionFilters.map(f => f.toString).mkString(" ") logInfo(s"OTreeIndex partition filters (exec id ${execId}): ${pfStr}") val dfStr = dataFilters.map(f => f.toString).mkString(" ") logInfo(s"OTreeIndex data filters (exec id ${execId}): ${dfStr}") + val allFilesCount = snapshot.allFiles.count - val nFiltered = allFilesCount - fileStats.length + val nFiltered = allFilesCount - fileStatsSet.size val filteredPct = ((nFiltered * 1.0) / allFilesCount) * 100.0 val filteredMsg = f"${nFiltered} of ${allFilesCount} (${filteredPct}%.2f%%)" logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}") - Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) + + // RETURN + Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStatsSet.toSeq)) } override def inputFiles: Array[String] = { diff --git a/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala b/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala index f35b7f11e..8be9f1f17 100644 --- a/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala @@ -3,11 +3,13 @@ package io.qbeast.spark.delta import io.qbeast.TestClasses.T2 import io.qbeast.core.model.Block import io.qbeast.spark.QbeastIntegrationTestSpec +import io.qbeast.spark.internal.commands.ConvertToQbeastCommand import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.functions.expr class OTreeIndexTest extends QbeastIntegrationTestSpec { @@ -128,4 +130,27 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec { oTreeIndex.sizeInBytes shouldBe sizeInBytes }) + it should "filter files with underlying data skipping" in withSparkAndTmpDir( + (spark, tmpdir) => { + + import spark.implicits._ + val source = Seq(1, 2, 3, 4).toDF("id") + + source.write + .format("delta") + .save(tmpdir) + + ConvertToQbeastCommand.apply(tmpdir, Seq("id"), 1000) + + val deltaLog = DeltaLog.forTable(spark, tmpdir) + val tahoeFileIndex = + TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false) + val oTreeIndex = new OTreeIndexTest(tahoeFileIndex) + + val allFiles = oTreeIndex.listFiles(Seq.empty, Seq.empty) + val filteredFiles = oTreeIndex.listFiles(Seq.empty, Seq(expr("id == 1").expr)) + + allFiles.size shouldBe <(filteredFiles.size) + }) + } From 1638b56f45a768f3091c4891f40f95cb01e4c457 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 09:07:10 +0100 Subject: [PATCH 02/11] Reorganize code and tests --- .../io/qbeast/spark/delta/OTreeIndex.scala | 56 ++++++++++++------- src/test/scala/io/qbeast/TestUtils.scala | 50 +++++++++++++++++ .../qbeast/spark/delta/OTreeIndexTest.scala | 13 +++-- .../spark/utils/QbeastDeltaStagingTest.scala | 4 +- .../utils/QbeastFilterPushdownTest.scala | 38 +------------ .../spark/utils/QbeastSamplingTest.scala | 27 +-------- 6 files changed, 101 insertions(+), 87 deletions(-) create mode 100644 src/test/scala/io/qbeast/TestUtils.scala diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 072c72b50..bff66c460 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -54,23 +54,48 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { } /** - * Collect AddFiles from _delta_log and convert them into FileStatuses. - * The output is merged with those built from QbeastBlocks. + * Collect matching QbeastBlocks and convert them into FileStatuses. + * @param partitionFilters + * @param dataFilters * @return */ - private def deltaMatchingFiles( + private def qbeastMatchingFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileStatus] = { - - index.matchingFiles(partitionFilters, dataFilters).map { f => + matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => new FileStatus( - /* length */ f.size, + /* length */ qbeastBlock.size, /* isDir */ false, /* blockReplication */ 0, /* blockSize */ 1, - /* modificationTime */ f.modificationTime, - absolutePath(f.path)) - } + /* modificationTime */ qbeastBlock.modificationTime, + absolutePath(qbeastBlock.path)) + }.toSeq + } + + /** + * Collect matching staging files from _delta_log and convert them into FileStatuses. + * The output is merged with those built from QbeastBlocks. + * @return + */ + private def deltaMatchingFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[FileStatus] = { + + // REMOVE REPLICATED FILES + // MAP FILES TO FILE STATUS + index + .matchingFiles(partitionFilters, dataFilters) + .filter(f => f.tags.isEmpty) + .map { f => + new FileStatus( + /* length */ f.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ f.modificationTime, + absolutePath(f.path)) + } } override def listFiles( @@ -78,21 +103,12 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { // FILTER FILES FROM QBEAST - val qbeastFileStats = matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => - new FileStatus( - /* length */ qbeastBlock.size, - /* isDir */ false, - /* blockReplication */ 0, - /* blockSize */ 1, - /* modificationTime */ qbeastBlock.modificationTime, - absolutePath(qbeastBlock.path)) - }.toArray - + val qbeastFileStats = qbeastMatchingFiles(partitionFilters, dataFilters) // FILTER FILES FROM DELTA val deltaFileStats = deltaMatchingFiles(partitionFilters, dataFilters) // JOIN BOTH FILTERED FILES - val fileStatsSet = (qbeastFileStats ++ deltaFileStats).toSet + val fileStatsSet = (qbeastFileStats ++ deltaFileStats) val execId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val pfStr = partitionFilters.map(f => f.toString).mkString(" ") diff --git a/src/test/scala/io/qbeast/TestUtils.scala b/src/test/scala/io/qbeast/TestUtils.scala new file mode 100644 index 000000000..4995eb5b6 --- /dev/null +++ b/src/test/scala/io/qbeast/TestUtils.scala @@ -0,0 +1,50 @@ +package io.qbeast + +import io.qbeast.spark.QbeastIntegrationTestSpec +import io.qbeast.spark.delta.OTreeIndex +import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.FileSourceScanExec + +object TestUtils extends QbeastIntegrationTestSpec { + + def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = { + val leaves = query.queryExecution.sparkPlan.collectLeaves() + + val dataFilters = leaves + .collectFirst { + case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => + f.dataFilters.filterNot(_.isInstanceOf[QbeastMurmur3Hash]) + } + .getOrElse(Seq.empty) + + val dataFiltersSql = dataFilters.map(_.sql) + sqlFilters.foreach(filter => dataFiltersSql should contain(filter)) + } + + def checkFileFiltering(query: DataFrame): Unit = { + val leaves = + query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec]) + + leaves should not be empty + + leaves.exists(p => + p + .asInstanceOf[FileSourceScanExec] + .relation + .location + .isInstanceOf[OTreeIndex]) shouldBe true + + leaves + .foreach { + case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => + val index = f.relation.location + val matchingFiles = + index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files) + val allFiles = index.listFiles(Seq.empty, Seq.empty).flatMap(_.files) + matchingFiles.length shouldBe <(allFiles.length) + } + + } + +} diff --git a/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala b/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala index 8be9f1f17..35a7ab67a 100644 --- a/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala @@ -136,21 +136,26 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec { import spark.implicits._ val source = Seq(1, 2, 3, 4).toDF("id") - source.write + source + .coalesce(4) + .write .format("delta") .save(tmpdir) + // CONVERT TO QBEAST ConvertToQbeastCommand.apply(tmpdir, Seq("id"), 1000) val deltaLog = DeltaLog.forTable(spark, tmpdir) + val snapshot = deltaLog.update() val tahoeFileIndex = - TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false) + TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, snapshot, Seq.empty, false) val oTreeIndex = new OTreeIndexTest(tahoeFileIndex) - val allFiles = oTreeIndex.listFiles(Seq.empty, Seq.empty) + val allFiles = snapshot.allFiles val filteredFiles = oTreeIndex.listFiles(Seq.empty, Seq(expr("id == 1").expr)) - allFiles.size shouldBe <(filteredFiles.size) + allFiles.count() shouldBe 4 + filteredFiles.size shouldBe 1 }) } diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala index b49dbfadb..1797f8638 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala @@ -41,7 +41,7 @@ class QbeastDeltaStagingTest extends QbeastIntegrationTestSpec with StagingUtils val qbeastDf = spark.read.format("qbeast").load(tmpDir) val deltaDf = spark.read.format("delta").load(tmpDir) - assertLargeDatasetEquality(qbeastDf, deltaDf) + assertLargeDatasetEquality(qbeastDf, deltaDf, orderedComparison = false) // Should have the staging revision and the first revision val snapshot = DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot @@ -64,7 +64,7 @@ class QbeastDeltaStagingTest extends QbeastIntegrationTestSpec with StagingUtils val deltaDf = spark.read.format("delta").load(tmpDir) qbeastDf.count() shouldBe deltaDf.count() - assertLargeDatasetEquality(qbeastDf, deltaDf) + assertLargeDatasetEquality(qbeastDf, deltaDf, orderedComparison = false) // Should preserve standing staging revision behavior val snapshot = DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala index bd1ea183a..68ee79616 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala @@ -1,11 +1,11 @@ package io.qbeast.spark.utils +import io.qbeast.TestUtils.{checkFileFiltering, checkLogicalFilterPushdown} import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.delta.OTreeIndex -import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions.{avg, col, rand, regexp_replace, when} +import org.scalatest.exceptions.TestFailedException class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec { @@ -16,40 +16,6 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec { private val filter_user_equal = "(user_id = 536764969)" private val filter_product_equal = "(product_id = 11522682)" - private def checkFiltersArePushedDown(query: DataFrame): Unit = { - val leaves = - query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec]) - - leaves should not be empty - - leaves.exists(p => - p - .asInstanceOf[FileSourceScanExec] - .relation - .location - .isInstanceOf[OTreeIndex]) shouldBe true - - leaves - .foreach { - case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => - f.dataFilters.nonEmpty shouldBe true - } - } - - private def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = { - val leaves = query.queryExecution.sparkPlan.collectLeaves() - - val dataFilters = leaves - .collectFirst { - case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => - f.dataFilters.filterNot(_.isInstanceOf[QbeastMurmur3Hash]) - } - .getOrElse(Seq.empty) - - val dataFiltersSql = dataFilters.map(_.sql) - sqlFilters.foreach(filter => dataFiltersSql should contain(filter)) - } - "Qbeast" should "return a valid filtering of the original dataset " + "for one column" in withSparkAndTmpDir { (spark, tmpDir) => diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala index f5b346788..9a152268f 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala @@ -1,34 +1,11 @@ package io.qbeast.spark.utils -import io.qbeast.spark.delta.OTreeIndex +import io.qbeast.TestUtils.checkFileFiltering import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{SparkSession} class QbeastSamplingTest extends QbeastIntegrationTestSpec { - private def checkFileFiltering(query: DataFrame): Unit = { - val leaves = query.queryExecution.executedPlan.collectLeaves() - - leaves.exists(p => - p - .asInstanceOf[FileSourceScanExec] - .relation - .location - .isInstanceOf[OTreeIndex]) shouldBe true - - leaves - .foreach { - case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => - val index = f.relation.location - val matchingFiles = - index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files) - val allFiles = index.inputFiles - matchingFiles.length shouldBe <(allFiles.length) - } - - } - "Qbeast" should "return a valid sample of the original dataset" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => From fa9667d7b73aaad2247fbcd79b54a8bd44bd01e9 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 09:47:49 +0100 Subject: [PATCH 03/11] Chaging to code on main-1.0.0 --- .../io/qbeast/spark/delta/OTreeIndex.scala | 27 +++++++++---------- src/test/scala/io/qbeast/TestUtils.scala | 2 +- .../utils/QbeastFilterPushdownTest.scala | 3 +-- .../spark/utils/QbeastSamplingTest.scala | 6 ++--- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index bff66c460..31087ffbb 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -62,15 +62,13 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { private def qbeastMatchingFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileStatus] = { - matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => - new FileStatus( - /* length */ qbeastBlock.size, - /* isDir */ false, - /* blockReplication */ 0, - /* blockSize */ 1, - /* modificationTime */ qbeastBlock.modificationTime, - absolutePath(qbeastBlock.path)) - }.toSeq + matchingBlocks(partitionFilters, dataFilters) + .map(block => (block.file)) + .map(file => (file.path, file)) + .toMap + .values + .map(IndexFiles.toFileStatus(index.path)) + .toSeq } /** @@ -78,12 +76,12 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { * The output is merged with those built from QbeastBlocks. * @return */ - private def deltaMatchingFiles( + private def stagingFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileStatus] = { - // REMOVE REPLICATED FILES - // MAP FILES TO FILE STATUS + // Filters only staging files (tags IS NULL) + // and maps AddFile to FileStatus index .matchingFiles(partitionFilters, dataFilters) .filter(f => f.tags.isEmpty) @@ -105,10 +103,9 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { // FILTER FILES FROM QBEAST val qbeastFileStats = qbeastMatchingFiles(partitionFilters, dataFilters) // FILTER FILES FROM DELTA - val deltaFileStats = deltaMatchingFiles(partitionFilters, dataFilters) - + val stagingFileStats = stagingFiles(partitionFilters, dataFilters) // JOIN BOTH FILTERED FILES - val fileStatsSet = (qbeastFileStats ++ deltaFileStats) + val fileStatsSet = qbeastFileStats ++ stagingFileStats val execId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val pfStr = partitionFilters.map(f => f.toString).mkString(" ") diff --git a/src/test/scala/io/qbeast/TestUtils.scala b/src/test/scala/io/qbeast/TestUtils.scala index 4995eb5b6..9301c0882 100644 --- a/src/test/scala/io/qbeast/TestUtils.scala +++ b/src/test/scala/io/qbeast/TestUtils.scala @@ -22,7 +22,7 @@ object TestUtils extends QbeastIntegrationTestSpec { sqlFilters.foreach(filter => dataFiltersSql should contain(filter)) } - def checkFileFiltering(query: DataFrame): Unit = { + def checkFiltersArePushedDown(query: DataFrame): Unit = { val leaves = query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec]) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala index 68ee79616..ad3db3531 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala @@ -1,11 +1,10 @@ package io.qbeast.spark.utils -import io.qbeast.TestUtils.{checkFileFiltering, checkLogicalFilterPushdown} +import io.qbeast.TestUtils.{checkFiltersArePushedDown, checkLogicalFilterPushdown} import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.delta.OTreeIndex import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions.{avg, col, rand, regexp_replace, when} -import org.scalatest.exceptions.TestFailedException class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec { diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala index 9a152268f..a32d42167 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala @@ -1,8 +1,8 @@ package io.qbeast.spark.utils -import io.qbeast.TestUtils.checkFileFiltering +import io.qbeast.TestUtils.checkFiltersArePushedDown import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.SparkSession class QbeastSamplingTest extends QbeastIntegrationTestSpec { @@ -50,7 +50,7 @@ class QbeastSamplingTest extends QbeastIntegrationTestSpec { val precision = 0.01 val query = df.sample(withReplacement = false, precision) - checkFileFiltering(query) + checkFiltersArePushedDown(query) } } From 78029838dedb9f2eb2d2de2dea9f90dccf2ea09f Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 09:53:27 +0100 Subject: [PATCH 04/11] Remove lazy val spark --- src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 31087ffbb..db00e2a37 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -33,8 +33,6 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { private def qbeastSnapshot = DeltaQbeastSnapshot(snapshot) - private lazy val spark = index.spark - protected def absolutePath(child: String): Path = { val p = new Path(new URI(child)) if (p.isAbsolute) { @@ -107,7 +105,8 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { // JOIN BOTH FILTERED FILES val fileStatsSet = qbeastFileStats ++ stagingFileStats - val execId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val sc = index.spark.sparkContext + val execId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val pfStr = partitionFilters.map(f => f.toString).mkString(" ") logInfo(s"OTreeIndex partition filters (exec id ${execId}): ${pfStr}") val dfStr = dataFilters.map(f => f.toString).mkString(" ") From 81f9d2e843d206971fb15af725b1229865494996 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 09:54:27 +0100 Subject: [PATCH 05/11] File stats set to seq --- src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index db00e2a37..8076f89bb 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -103,7 +103,7 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { // FILTER FILES FROM DELTA val stagingFileStats = stagingFiles(partitionFilters, dataFilters) // JOIN BOTH FILTERED FILES - val fileStatsSet = qbeastFileStats ++ stagingFileStats + val fileStats = qbeastFileStats ++ stagingFileStats val sc = index.spark.sparkContext val execId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) @@ -113,13 +113,13 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { logInfo(s"OTreeIndex data filters (exec id ${execId}): ${dfStr}") val allFilesCount = snapshot.allFiles.count - val nFiltered = allFilesCount - fileStatsSet.size + val nFiltered = allFilesCount - fileStats.size val filteredPct = ((nFiltered * 1.0) / allFilesCount) * 100.0 val filteredMsg = f"${nFiltered} of ${allFilesCount} (${filteredPct}%.2f%%)" logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}") // RETURN - Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStatsSet.toSeq)) + Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) } override def inputFiles: Array[String] = { From 3be90241581f9c693e0e341fac87f9eda8fbda10 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 09:55:02 +0100 Subject: [PATCH 06/11] length to size --- src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 8076f89bb..0822fadfc 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -113,7 +113,7 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { logInfo(s"OTreeIndex data filters (exec id ${execId}): ${dfStr}") val allFilesCount = snapshot.allFiles.count - val nFiltered = allFilesCount - fileStats.size + val nFiltered = allFilesCount - fileStats.length val filteredPct = ((nFiltered * 1.0) / allFilesCount) * 100.0 val filteredMsg = f"${nFiltered} of ${allFilesCount} (${filteredPct}%.2f%%)" logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}") From 7a9591bd26d8b953f33ee8cb01c8ec4baa917ed5 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 13:06:26 +0100 Subject: [PATCH 07/11] Add another check for staging files --- .../io/qbeast/spark/delta/DeltaStagingUtils.scala | 13 +++++++++++-- .../scala/io/qbeast/spark/delta/OTreeIndex.scala | 11 ++++++----- .../qbeast/spark/utils/QbeastDeltaStagingTest.scala | 4 ++-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala b/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala index 79ee82b81..4244fb355 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala @@ -18,7 +18,7 @@ private[spark] trait DeltaStagingUtils extends StagingUtils { /** * Condition for Staging AddFiles in the form of Spark sql Column */ - private val isStagingFile: Column = + private val isStagingFileColumn: Column = col("tags").isNull.or(col("tags.revision") === lit(stagingID.toString)) /** @@ -26,7 +26,16 @@ private[spark] trait DeltaStagingUtils extends StagingUtils { * @return */ protected def stagingFiles(): Dataset[AddFile] = { - snapshot.allFiles.where(isStagingFile) + snapshot.allFiles.where(isStagingFileColumn) } + /** + * Wether an AddFile is a staging file or not + * @param a the AddFile + * @return boolean + */ + def isStagingFile(a: AddFile): Boolean = + a.tags == null || a.tags.isEmpty || a.tags + .getOrElse("revision", "") == "0" + } diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 0822fadfc..17b958248 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -23,7 +23,10 @@ import java.net.URI * @param index the Tahoe log file index * @param spark spark session */ -case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { +case class OTreeIndex(index: TahoeLogFileIndex) + extends FileIndex + with DeltaStagingUtils + with Logging { /** * Snapshot to analyze @@ -78,11 +81,9 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileStatus] = { - // Filters only staging files (tags IS NULL) - // and maps AddFile to FileStatus index .matchingFiles(partitionFilters, dataFilters) - .filter(f => f.tags.isEmpty) + .filter(isStagingFile) .map { f => new FileStatus( /* length */ f.size, @@ -119,7 +120,7 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging { logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}") // RETURN - Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) + Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats.toSeq)) } override def inputFiles: Array[String] = { diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala index 1797f8638..b49dbfadb 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala @@ -41,7 +41,7 @@ class QbeastDeltaStagingTest extends QbeastIntegrationTestSpec with StagingUtils val qbeastDf = spark.read.format("qbeast").load(tmpDir) val deltaDf = spark.read.format("delta").load(tmpDir) - assertLargeDatasetEquality(qbeastDf, deltaDf, orderedComparison = false) + assertLargeDatasetEquality(qbeastDf, deltaDf) // Should have the staging revision and the first revision val snapshot = DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot @@ -64,7 +64,7 @@ class QbeastDeltaStagingTest extends QbeastIntegrationTestSpec with StagingUtils val deltaDf = spark.read.format("delta").load(tmpDir) qbeastDf.count() shouldBe deltaDf.count() - assertLargeDatasetEquality(qbeastDf, deltaDf, orderedComparison = false) + assertLargeDatasetEquality(qbeastDf, deltaDf) // Should preserve standing staging revision behavior val snapshot = DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot From 2685a7fa5939a2486c06b7c4ef0b78ca1cc01ce5 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 13:11:05 +0100 Subject: [PATCH 08/11] Use revisionID string --- src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala b/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala index 4244fb355..86723c187 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaStagingUtils.scala @@ -36,6 +36,6 @@ private[spark] trait DeltaStagingUtils extends StagingUtils { */ def isStagingFile(a: AddFile): Boolean = a.tags == null || a.tags.isEmpty || a.tags - .getOrElse("revision", "") == "0" + .getOrElse("revision", "") == stagingID.toString } From ec8da787a8c030eaef870cee11125a77b40ad989 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 14:41:56 +0100 Subject: [PATCH 09/11] Fix tests --- src/test/scala/io/qbeast/TestUtils.scala | 20 +++++++++++++++++++ .../utils/QbeastFilterPushdownTest.scala | 2 +- .../spark/utils/QbeastSamplingTest.scala | 4 ++-- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/test/scala/io/qbeast/TestUtils.scala b/src/test/scala/io/qbeast/TestUtils.scala index 9301c0882..1ec524bc1 100644 --- a/src/test/scala/io/qbeast/TestUtils.scala +++ b/src/test/scala/io/qbeast/TestUtils.scala @@ -28,6 +28,26 @@ object TestUtils extends QbeastIntegrationTestSpec { leaves should not be empty + leaves.exists(p => + p + .asInstanceOf[FileSourceScanExec] + .relation + .location + .isInstanceOf[OTreeIndex]) shouldBe true + + leaves + .foreach { + case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] => + f.dataFilters.nonEmpty shouldBe true + } + } + + def checkFileSkipping(query: DataFrame): Unit = { + val leaves = + query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec]) + + leaves should not be empty + leaves.exists(p => p .asInstanceOf[FileSourceScanExec] diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala index ad3db3531..403749a88 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala @@ -1,6 +1,6 @@ package io.qbeast.spark.utils -import io.qbeast.TestUtils.{checkFiltersArePushedDown, checkLogicalFilterPushdown} +import io.qbeast.TestUtils._ import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.delta.OTreeIndex import org.apache.spark.sql.execution.FileSourceScanExec diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala index a32d42167..d048025e9 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala @@ -1,6 +1,6 @@ package io.qbeast.spark.utils -import io.qbeast.TestUtils.checkFiltersArePushedDown +import io.qbeast.TestUtils.checkFileSkipping import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} import org.apache.spark.sql.SparkSession @@ -50,7 +50,7 @@ class QbeastSamplingTest extends QbeastIntegrationTestSpec { val precision = 0.01 val query = df.sample(withReplacement = false, precision) - checkFiltersArePushedDown(query) + checkFileSkipping(query) } } From dce697f5b6a2f1ce0e376166a1ff3b38ab6015c3 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 14:48:20 +0100 Subject: [PATCH 10/11] Remove unnecessary toSeq! --- src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 17b958248..e3855f5ca 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -120,7 +120,7 @@ case class OTreeIndex(index: TahoeLogFileIndex) logInfo(s"Qbeast filtered files (exec id ${execId}): ${filteredMsg}") // RETURN - Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats.toSeq)) + Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) } override def inputFiles: Array[String] = { From 681518ef5cc49bc920476fd296613b158c55fa25 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 28 Nov 2023 14:50:15 +0100 Subject: [PATCH 11/11] Change method name --- src/test/scala/io/qbeast/TestUtils.scala | 2 +- src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/scala/io/qbeast/TestUtils.scala b/src/test/scala/io/qbeast/TestUtils.scala index 1ec524bc1..807ceca0e 100644 --- a/src/test/scala/io/qbeast/TestUtils.scala +++ b/src/test/scala/io/qbeast/TestUtils.scala @@ -42,7 +42,7 @@ object TestUtils extends QbeastIntegrationTestSpec { } } - def checkFileSkipping(query: DataFrame): Unit = { + def checkFileFiltering(query: DataFrame): Unit = { val leaves = query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec]) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala index d048025e9..fa5d824ce 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSamplingTest.scala @@ -1,6 +1,6 @@ package io.qbeast.spark.utils -import io.qbeast.TestUtils.checkFileSkipping +import io.qbeast.TestUtils._ import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} import org.apache.spark.sql.SparkSession @@ -50,7 +50,7 @@ class QbeastSamplingTest extends QbeastIntegrationTestSpec { val precision = 0.01 val query = df.sample(withReplacement = false, precision) - checkFileSkipping(query) + checkFileFiltering(query) } }