Skip to content

Commit

Permalink
Add Unit Tests:
Browse files Browse the repository at this point in the history
-table with DVs
-empty table
-table with few AddFiles having zero rows

Signed-off-by: Felipe Fujiy Pessoto <fepessot@microsoft.com>
  • Loading branch information
felipepessoto committed Jan 5, 2024
1 parent 2c70c6b commit a36413c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ trait OptimizeMetadataOnlyDeltaQuery {
private def extractCountMinMaxFromStats(
deltaScanGenerator: DeltaScanGenerator,
lowerCaseColumnNames: Set[String]): (Option[Long], Map[String, DeltaColumnStat]) = {
// TODO Update this to work with DV (https://github.com/delta-io/delta/issues/1485)

val snapshot = deltaScanGenerator.snapshotToScan

// Count - account for deleted rows according to deletion vectors
Expand Down Expand Up @@ -170,7 +168,7 @@ trait OptimizeMetadataOnlyDeltaQuery {
lazy val files = filesWithStatsForScan.filter(col("stats.numRecords") > 0)
lazy val statsMinMaxNullColumns = files.select(col("stats.*"))
if (dataColumns.isEmpty
|| !isTableDVFree(snapshot)
|| !isTableDVFree(snapshot) // When DV enabled we can't rely on stats values easily
|| numFiles == 0
|| !statsMinMaxNullColumns.columns.contains("minValues")
|| !statsMinMaxNullColumns.columns.contains("maxValues")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package org.apache.spark.sql.delta.perf

import scala.collection.mutable
import io.delta.tables.DeltaTable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.delta.{DeltaColumnMappingEnableIdMode,
DeltaColumnMappingEnableNameMode, DeltaLog, DeltaTestUtils}
DeltaColumnMappingEnableNameMode, DeletionVectorsTestUtils, DeltaLog, DeltaTestUtils}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.PrepareDeltaScanBase
Expand All @@ -38,7 +39,8 @@ class OptimizeMetadataOnlyDeltaQuerySuite
extends QueryTest
with SharedSparkSession
with BeforeAndAfterAll
with DeltaSQLCommandTest {
with DeltaSQLCommandTest
with DeletionVectorsTestUtils {
val testTableName = "table_basic"
val noStatsTableName = " table_nostats"
val mixedStatsTableName = " table_mixstats"
Expand Down Expand Up @@ -515,7 +517,7 @@ class OptimizeMetadataOnlyDeltaQuerySuite

test("count-min-max - external table") {
withTempDir { dir =>
val testTablePath = dir.getAbsolutePath
val testTablePath = dir.getCanonicalPath
dfPart1.write.format("delta").mode("overwrite").save(testTablePath)
DeltaTable.forPath(spark, testTablePath).delete("id = 1")
dfPart2.write.format("delta").mode(SaveMode.Append).save(testTablePath)
Expand Down Expand Up @@ -715,6 +717,50 @@ class OptimizeMetadataOnlyDeltaQuerySuite
assert(firstRow.getString(1) === "NULL")
}

test("count - dv-enabled") {
withTempDir { dir =>
val tempPath = dir.getCanonicalPath
spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)

enableDeletionVectorsInTable(new Path(tempPath), true)
DeltaTable.forPath(spark, tempPath).delete("id = 1")
assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new Path(tempPath))).isEmpty)

checkResultsAndOptimizedPlan(
s"SELECT COUNT(*) FROM delta.`$tempPath`",
"LocalRelation [none#0L]")
}
}

test("count - zero rows AddFile") {
withTempDir { dir =>
val tempPath = dir.getCanonicalPath
val df = spark.range(1, 10)
val expectedResult = df.count()
df.write.format("delta").save(tempPath)

// Creates AddFile entries with non-existing files
// The query should read only the delta log and not the parquet files
val log = DeltaLog.forTable(spark, tempPath)
val txn = log.startTransaction()
txn.commitManually(
DeltaTestUtils.createTestAddFile(path = "1.parquet", stats = "{\"numRecords\": 0}"),
DeltaTestUtils.createTestAddFile(path = "2.parquet", stats = "{\"numRecords\": 0}"),
DeltaTestUtils.createTestAddFile(path = "3.parquet", stats = "{\"numRecords\": 0}"))

withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> "true") {
val queryDf = spark.sql(s"SELECT COUNT(*) FROM delta.`$tempPath`")
val optimizedPlan = queryDf.queryExecution.optimizedPlan.canonicalized.toString()

assert(queryDf.head().getLong(0) === expectedResult)

assertResult("LocalRelation [none#0L]") {
optimizedPlan.trim
}
}
}
}

// Tests to validate the optimizer won't incorrectly change queries it can't correctly handle

Seq((s"SELECT COUNT(*) FROM $mixedStatsTableName", "missing stats"),
Expand Down Expand Up @@ -777,6 +823,30 @@ class OptimizeMetadataOnlyDeltaQuerySuite
s"SELECT MAX(Column2) FROM $tableName")
}

// For empty tables the stats won't be found and the query should not be optimized
test("optimization not supported - min-max empty table") {
val tableName = "TestMinMaxEmptyTable"

spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")

checkOptimizationIsNotTriggered(
s"SELECT MIN(Column1), MAX(Column1) FROM $tableName")
}

test("optimization not supported - min-max dv-enabled") {
withTempDir { dir =>
val tempPath = dir.getCanonicalPath
spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
val querySql = s"SELECT MIN(id), MAX(id) FROM delta.`$tempPath`"
checkResultsAndOptimizedPlan(querySql, "LocalRelation [none#0L, none#1L]")

enableDeletionVectorsInTable(new Path(tempPath), true)
DeltaTable.forPath(spark, tempPath).delete("id = 1")
assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new Path(tempPath))).isEmpty)
checkOptimizationIsNotTriggered(querySql)
}
}

test("optimization not supported - filter on partitioned column") {
val tableName = "TestPartitionedFilter"

Expand Down

0 comments on commit a36413c

Please sign in to comment.