Skip to content

Commit

Permalink
Refactor to address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Felipe Fujiy Pessoto <fepessot@microsoft.com>
  • Loading branch information
felipepessoto committed Jan 5, 2024
1 parent a36413c commit ce394fb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import java.util.Locale
* This optimization is only applied when the following conditions are met:
* - The MIN/MAX columns data type is supported by the optimization (ByteType, ShortType,
* IntegerType, LongType, FloatType, DoubleType, DateType).
* - All AddFiles in the Delta Log must have stats on columns used in MIN/MAX expressions,
* or the columns must be partitioned, in the latter case it uses partitionValues, a required field.
* - Table has no deletion vectors, or query has no MIN/MAX expressions.
* - COUNT has no DISTINCT.
* - Query has no filters.
Expand All @@ -53,8 +55,8 @@ trait OptimizeMetadataOnlyDeltaQuery {
protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator

private def createLocalRelationPlan(
plan: Aggregate,
tahoeLogFileIndex: TahoeLogFileIndex): LogicalPlan = {
plan: Aggregate,
tahoeLogFileIndex: TahoeLogFileIndex): LogicalPlan = {

val aggColumnsNames = Set(extractMinMaxFieldNames(plan).map(_.toLowerCase(Locale.ROOT)) : _*)
val (rowCount, columnStats) = extractCountMinMaxFromDeltaLog(tahoeLogFileIndex, aggColumnsNames)
Expand Down Expand Up @@ -135,8 +137,8 @@ trait OptimizeMetadataOnlyDeltaQuery {
case class DeltaColumnStat(min: Any, max: Any)

private def extractCountMinMaxFromStats(
deltaScanGenerator: DeltaScanGenerator,
lowerCaseColumnNames: Set[String]): (Option[Long], Map[String, DeltaColumnStat]) = {
deltaScanGenerator: DeltaScanGenerator,
lowerCaseColumnNames: Set[String]): (Option[Long], Map[String, DeltaColumnStat]) = {
val snapshot = deltaScanGenerator.snapshotToScan

// Count - account for deleted rows according to deletion vectors
Expand All @@ -160,28 +162,33 @@ trait OptimizeMetadataOnlyDeltaQuery {
lazy val numFiles: Long = filesStatsCount.getAs[Long]("countNonEmptyFiles")

val dataColumns = snapshot.statCollectionPhysicalSchema.filter(col =>
AggregateDeltaTable.isSupportedDataType(col.dataType) &&
lowerCaseColumnNames.contains(col.name.toLowerCase(Locale.ROOT)))

// DELETE operations creates AddFile records with 0 rows, and no column stats.
// We can safely ignore it since there is no data.
lazy val files = filesWithStatsForScan.filter(col("stats.numRecords") > 0)
lazy val statsMinMaxNullColumns = files.select(col("stats.*"))

val minColName = "minValues"
val maxColName = "maxValues"
val nullColName = "nullCount"

if (dataColumns.isEmpty
|| dataColumns.size != lowerCaseColumnNames.size
|| !isTableDVFree(snapshot) // When DV enabled we can't rely on stats values easily
|| numFiles == 0
|| !statsMinMaxNullColumns.columns.contains("minValues")
|| !statsMinMaxNullColumns.columns.contains("maxValues")
|| !statsMinMaxNullColumns.columns.contains("nullCount")) {
|| !statsMinMaxNullColumns.columns.contains(minColName)
|| !statsMinMaxNullColumns.columns.contains(maxColName)
|| !statsMinMaxNullColumns.columns.contains(nullColName)) {
return (Some(numRecords), Map.empty)
}

// dataColumns can contain columns without stats if dataSkippingNumIndexedCols
// has been increased
val columnsWithStats = files.select(
col("stats.minValues.*"),
col("stats.maxValues.*"),
col("stats.nullCount.*"))
col(s"stats.$minColName.*"),
col(s"stats.$maxColName.*"),
col(s"stats.$nullColName.*"))
.columns.groupBy(identity).mapValues(_.size)
.filter(x => x._2 == 3) // 3: minValues, maxValues, nullCount
.map(x => x._1).toSet
Expand All @@ -194,29 +201,29 @@ trait OptimizeMetadataOnlyDeltaQuery {
val dataType = columnAndPhysicalName._1.dataType
val physicalName = columnAndPhysicalName._2

Seq(col(s"stats.minValues.`$physicalName`").cast(dataType).as(s"min.$physicalName"),
col(s"stats.maxValues.`$physicalName`").cast(dataType).as(s"max.$physicalName"),
col(s"stats.nullCount.`$physicalName`").as(s"nullCount.$physicalName"))
Seq(col(s"stats.$minColName.`$physicalName`").cast(dataType).as(s"min.$physicalName"),
col(s"stats.$maxColName.`$physicalName`").cast(dataType).as(s"max.$physicalName"),
col(s"stats.$nullColName.`$physicalName`").as(s"null_count.$physicalName"))
} ++ Seq(col(s"stats.numRecords").as(s"numRecords"))

val minMaxExpr = dataColumnsWithStats.flatMap { columnAndPhysicalName =>
val physicalName = columnAndPhysicalName._2

// To validate if the column has stats we do two validation:
// 1-) COUNT(nullCount.columnName) should be equals to numFiles,
// since nullCount is always non-null.
// 1-) COUNT(null_count.columnName) should be equals to numFiles,
// since null_count is always non-null.
// 2-) The number of files with non-null min/max:
// a. count(min.columnName)|count(max.columnName) +
// the number of files where all rows are NULL:
// b. count of (ISNULL(min.columnName) and nullCount.columnName == numRecords)
// b. count of (ISNULL(min.columnName) and null_count.columnName == numRecords)
// should be equals to numFiles
Seq(
s"""case when $numFiles = count(`nullCount.$physicalName`)
s"""case when $numFiles = count(`null_count.$physicalName`)
| AND $numFiles = (count(`min.$physicalName`) + sum(case when
| ISNULL(`min.$physicalName`) and `nullCount.$physicalName` = numRecords
| ISNULL(`min.$physicalName`) and `null_count.$physicalName` = numRecords
| then 1 else 0 end))
| AND $numFiles = (count(`max.$physicalName`) + sum(case when
| ISNULL(`max.$physicalName`) AND `nullCount.$physicalName` = numRecords
| ISNULL(`max.$physicalName`) AND `null_count.$physicalName` = numRecords
| then 1 else 0 end))
| then TRUE else FALSE end as `complete_$physicalName`""".stripMargin,
s"min(`min.$physicalName`) as `min_$physicalName`",
Expand All @@ -238,12 +245,11 @@ trait OptimizeMetadataOnlyDeltaQuery {
}

private def extractMinMaxFromPartitionValue(
snapshot: Snapshot,
lowerCaseColumnNames: Set[String]): Map[String, DeltaColumnStat] = {
snapshot: Snapshot,
lowerCaseColumnNames: Set[String]): Map[String, DeltaColumnStat] = {

val partitionedColumns = snapshot.metadata.partitionSchema
.filter(col => AggregateDeltaTable.isSupportedDataType(col.dataType) &&
lowerCaseColumnNames.contains(col.name.toLowerCase(Locale.ROOT)))
.filter(col => lowerCaseColumnNames.contains(col.name.toLowerCase(Locale.ROOT)))
.map(col => (col, DeltaColumnMapping.getPhysicalName(col)))

if (partitionedColumns.isEmpty) {
Expand Down Expand Up @@ -285,21 +291,20 @@ trait OptimizeMetadataOnlyDeltaQuery {
* If the column is partitioned, the values are extracted from partitionValues.
*/
private def extractCountMinMaxFromDeltaLog(
tahoeLogFileIndex: TahoeLogFileIndex,
lowerCaseColumnNames: Set[String]):
tahoeLogFileIndex: TahoeLogFileIndex,
lowerCaseColumnNames: Set[String]):
(Option[Long], CaseInsensitiveMap[DeltaColumnStat]) = {
val deltaScanGen = getDeltaScanGenerator(tahoeLogFileIndex)
val (rowCount, columnStats) = extractCountMinMaxFromStats(deltaScanGen, lowerCaseColumnNames)

val minMaxValues = if (lowerCaseColumnNames.equals(columnStats.keySet)) {
CaseInsensitiveMap(columnStats)
} else {
CaseInsensitiveMap(
columnStats.++
(extractMinMaxFromPartitionValue(deltaScanGen.snapshotToScan, lowerCaseColumnNames)))
}
val partitionedValues = extractMinMaxFromPartitionValue(
deltaScanGen.snapshotToScan,
lowerCaseColumnNames)

val partitionedColNames = partitionedValues.keySet.map(_.toLowerCase(Locale.ROOT))
val dataColumnNames = lowerCaseColumnNames -- partitionedColNames
val (rowCount, columnStats) = extractCountMinMaxFromStats(deltaScanGen, dataColumnNames)

(rowCount, minMaxValues)
(rowCount, CaseInsensitiveMap(columnStats ++ partitionedValues))
}

object AggregateDeltaTable {
Expand All @@ -313,7 +318,7 @@ trait OptimizeMetadataOnlyDeltaQuery {
}

private def getAggFunctionOptimizable(
aggExpr: AggregateExpression): Option[DeclarativeAggregate] = {
aggExpr: AggregateExpression): Option[DeclarativeAggregate] = {

aggExpr match {
case AggregateExpression(
Expand All @@ -329,7 +334,7 @@ trait OptimizeMetadataOnlyDeltaQuery {
}
}

private def isStatsOptimizable(aggExpr: Seq[Alias]): Boolean = aggExpr.forall {
private def isStatsOptimizable(aggExprs: Seq[Alias]): Boolean = aggExprs.forall {
case Alias(aggExpr: AggregateExpression, _) => getAggFunctionOptimizable(aggExpr).isDefined
case Alias(ToPrettyString(aggExpr: AggregateExpression, _), _) =>
getAggFunctionOptimizable(aggExpr).isDefined
Expand All @@ -348,19 +353,19 @@ trait OptimizeMetadataOnlyDeltaQuery {
def unapply(plan: Aggregate): Option[TahoeLogFileIndex] = plan match {
case Aggregate(
Nil, // GROUP BY not supported
aggExpr: Seq[Alias @unchecked], // Underlying type is not checked because of type erasure.
aggExprs: Seq[Alias @unchecked], // Underlying type is not checked because of type erasure.
// Alias type check is done in isStatsOptimizable.
PhysicalOperation(fields, Nil, DeltaTable(fileIndex: TahoeLogFileIndex)))
if fileIndex.partitionFilters.isEmpty &&
fieldsAreAttributeReference(fields) &&
isStatsOptimizable(aggExpr) => Some(fileIndex)
isStatsOptimizable(aggExprs) => Some(fileIndex)
case Aggregate(
Nil,
aggExpr: Seq[Alias @unchecked],
aggExprs: Seq[Alias @unchecked],
// When all columns are selected, there are no Project/PhysicalOperation
DeltaTable(fileIndex: TahoeLogFileIndex))
if fileIndex.partitionFilters.isEmpty &&
isStatsOptimizable(aggExpr) => Some(fileIndex)
isStatsOptimizable(aggExprs) => Some(fileIndex)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ class OptimizeMetadataOnlyDeltaQuerySuite
spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
val querySql = s"SELECT MIN(id), MAX(id) FROM delta.`$tempPath`"
checkResultsAndOptimizedPlan(querySql, "LocalRelation [none#0L, none#1L]")

enableDeletionVectorsInTable(new Path(tempPath), true)
DeltaTable.forPath(spark, tempPath).delete("id = 1")
assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new Path(tempPath))).isEmpty)
Expand Down

0 comments on commit ce394fb

Please sign in to comment.