Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Table Property delta.dataSkippingStatsColumns #1763

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@
],
"sqlState" : "0AKDC"
},
"DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_PARTITIONED_COLUMN" : {
"message" : [
"Data skipping is not supported for partition column '<column>'."
],
"sqlState" : "0AKDC"
},
"DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_TYPE" : {
"message" : [
"Data skipping is not supported for column '<column>' of type <type>."
],
"sqlState" : "0AKDC"
},
"DELTA_COLUMN_MAPPING_MAX_COLUMN_ID_NOT_SET" : {
"message" : [
"The max column id property (<prop>) is not set on a column mapping enabled table."
Expand Down Expand Up @@ -579,6 +591,12 @@
],
"sqlState" : "42701"
},
"DELTA_DUPLICATE_DATA_SKIPPING_COLUMNS" : {
"message" : [
"Duplicated data skipping columns found: <columns>."
],
"sqlState" : "42701"
},
"DELTA_DUPLICATE_DOMAIN_METADATA_INTERNAL_ERROR" : {
"message" : [
"Internal error: two DomainMetadata actions within the same transaction have the same domain <domainName>"
Expand Down
32 changes: 31 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{HashMap, Locale}
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DataSkippingReader
import org.apache.spark.sql.delta.stats.{DataSkippingReader, StatisticsCollection}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, IntervalUtils}
Expand Down Expand Up @@ -485,6 +485,36 @@ trait DeltaConfigsBase extends DeltaLogging {
a => a >= -1,
"needs to be larger than or equal to -1.")

/**
* The names of specific columns to collect stats on for data skipping. If present, it takes
* precedences over dataSkippingNumIndexedCols config, and the system will only collect stats for
kamcheungting-db marked this conversation as resolved.
Show resolved Hide resolved
* columns that exactly match those specified. If a nested column is specified, the system will
* collect stats for all leaf fields of that column. If a non-existent column is specified, it
* will be ignored. Updating this conf does not trigger stats re-collection, but redefines the
* stats schema of table, i.e., it will change the behavior of future stats collection (e.g., in
* append and OPTIMIZE) as well as data skipping (e.g., the column stats not mentioned by this
* config will be ignored even if they exist).
*/
val DATA_SKIPPING_STATS_COLUMNS = buildConfig[Option[String]](
"dataSkippingStatsColumns",
null,
v => Option(v),
vOpt => vOpt.forall(v => StatisticsCollection.parseDeltaStatsColumnNames(v).isDefined),
"""
|The dataSkippingStatsColumns parameter is a comma-separated list of case-insensitive column
|identifiers. Each column identifier can consist of letters, digits, and underscores.
|Multiple column identifiers can be listed, separated by commas.
|
|If a column identifier includes special characters such as '!@#$%^&*()_+-={}|[]:";'<>,.?/',
|the column name should be enclosed in backticks (`) to escape the special characters.
|
|A column identifier can refer to one of the following: the name of a non-struct column, the
|leaf field's name of a struct column, or the name of a struct column. When a struct column's
|name is specified in dataSkippingStatsColumns, statistics for all its leaf fields will be
|collected.
|""".stripMargin)


val SYMLINK_FORMAT_MANIFEST_ENABLED = buildConfig[Boolean](
s"${hooks.GenerateSymlinkManifest.CONFIG_NAME_ROOT}.enabled",
"false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
proposedNewMetadata: Metadata,
ignoreDefaultProperties: Boolean = false): Unit = {
var newMetadataTmp = proposedNewMetadata
// Validate all indexed columns are inside table's schema.
StatisticsCollection.validateDeltaStatsColumns(newMetadataTmp)
if (readVersion == -1 || isCreatingNewTable) {
// We need to ignore the default properties when trying to create an exact copy of a table
// (as in CLONE and SHALLOW CLONE).
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DataSkippingReader
import org.apache.spark.sql.delta.stats.DeltaStatsColumnSpec
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.util.StateCache
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -87,6 +88,8 @@ class Snapshot(
/** Snapshot to scan by the DeltaScanGenerator for metadata query optimizations */
override val snapshotToScan: Snapshot = this

override def columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode


@volatile private[delta] var stateReconstructionTriggered = false

Expand Down Expand Up @@ -152,7 +155,8 @@ class Snapshot(
}

/** Number of columns to collect stats on for data skipping */
lazy val numIndexedCols: Int = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)
override lazy val statsColumnSpec: DeltaStatsColumnSpec =
StatisticsCollection.configuredDeltaStatsColumnSpec(metadata)

/** Performs validations during initialization */
protected def init(): Unit = {
Expand Down Expand Up @@ -328,7 +332,11 @@ class Snapshot(
allFiles = checksumOpt.flatMap(_.allFiles))

/** Returns the data schema of the table, used for reading stats */
def tableDataSchema: StructType = metadata.dataSchema
def tableSchema: StructType = metadata.dataSchema

def outputTableStatsSchema: StructType = metadata.dataSchema

def outputAttributeSchema: StructType = metadata.dataSchema

/** Returns the schema of the columns written out to file (overridden in write path) */
def dataSchema: StructType = metadata.dataSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -104,7 +105,9 @@ class DeltaCatalog extends DelegatingCatalogExtension
var newPartitionColumns = partitionColumns
var newBucketSpec = maybeBucketSpec
val conf = spark.sessionState.conf

allTableProperties.asScala
.get(DeltaConfigs.DATA_SKIPPING_STATS_COLUMNS.key)
.foreach(StatisticsCollection.validateDeltaStatsColumns(schema, partitionColumns, _))
val isByPath = isPathIdentifier(ident)
if (isByPath && !conf.getConf(DeltaSQLConf.DELTA_LEGACY_ALLOW_AMBIGUOUS_PATHS)
&& allTableProperties.containsKey("location")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
val df = spark.createDataFrame(new java.util.ArrayList[Row](), dataSchema)
val checkColStat = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK)
val statCollectionSchema = txn.snapshot.statCollectionSchema
val statCollectionSchema = txn.snapshot.statCollectionLogicalSchema
val colsWithoutStats = ArrayBuffer[String]()

unresolvedZOrderByCols.foreach { colAttribute =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraint
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.StatisticsCollection

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
Expand Down Expand Up @@ -119,6 +120,7 @@ case class AlterTableSetPropertiesDeltaCommand(
case _ =>
true
}

val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
Expand Down Expand Up @@ -310,9 +312,13 @@ case class AlterTableDropColumnsDeltaCommand(
if (droppingPartitionCols.nonEmpty) {
throw DeltaErrors.dropPartitionColumnNotSupported(droppingPartitionCols)
}

val newMetadata = metadata.copy(schemaString = newSchema.json)

// Updates the delta statistics column list by removing the dropped columns from it.
val newConfiguration = metadata.configuration ++
StatisticsCollection.dropDeltaStatsColumns(metadata, columnsToDrop)
val newMetadata = metadata.copy(
schemaString = newSchema.json,
configuration = newConfiguration
)
columnsToDrop.foreach { columnParts =>
checkDependentExpressions(sparkSession, columnParts, newMetadata, txn.protocol, "drop")
}
Expand Down Expand Up @@ -397,8 +403,17 @@ case class AlterTableChangeColumnDeltaCommand(
}
} else metadata.partitionColumns

val oldColumnPath = columnPath :+ columnName
val newColumnPath = columnPath :+ newColumn.name
// Rename the column in the delta statistics columns configuration, if present.
val newConfiguration = metadata.configuration ++
StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath)

val newMetadata = metadata.copy(
schemaString = newSchema.json, partitionColumns = newPartitionColumns)
schemaString = newSchema.json,
partitionColumns = newPartitionColumns,
configuration = newConfiguration
)

if (newColumn.name != columnName) {
// need to validate the changes if the column is renamed
Expand All @@ -411,9 +426,7 @@ case class AlterTableChangeColumnDeltaCommand(

if (newColumn.name != columnName) {
// record column rename separately
txn.commit(Nil, DeltaOperations.RenameColumn(
columnPath :+ columnName,
columnPath :+ newColumn.name))
txn.commit(Nil, DeltaOperations.RenameColumn(oldColumnPath, newColumnPath))
} else {
txn.commit(Nil, DeltaOperations.ChangeColumn(
columnPath, columnName, newColumn, colPosition.map(_.toString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInv
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA
import org.apache.spark.sql.delta.stats.{DeltaJobStatisticsTracker, StatisticsCollection}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -241,24 +242,25 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
}

/**
* Return a tuple of (statsDataSchema, statsCollectionSchema).
* statsDataSchema is the data source schema from DataFrame used for stats collection. It
* contains the columns in the DataFrame output, excluding the partition columns.
* statsCollectionSchema is the schema to collect stats for. It contains the columns in the
* Return a tuple of (outputStatsCollectionSchema, statsCollectionSchema).
* outputStatsCollectionSchema is the data source schema from DataFrame used for stats collection.
* It contains the columns in the DataFrame output, excluding the partition columns.
* tableStatsCollectionSchema is the schema to collect stats for. It contains the columns in the
* table schema, excluding the partition columns.
* Note: We only collect NULL_COUNT stats (as the number of rows) for the columns in
* statsCollectionSchema but missing in statsDataSchema
* statsCollectionSchema but missing in outputStatsCollectionSchema
*/
protected def getStatsSchema(
dataFrameOutput: Seq[Attribute],
partitionSchema: StructType): (Seq[Attribute], Seq[Attribute]) = {
val partitionColNames = partitionSchema.map(_.name).toSet

// statsDataSchema comes from DataFrame output
// The outputStatsCollectionSchema comes from DataFrame output
// schema should be normalized, therefore we can do an equality check
val statsDataSchema = dataFrameOutput.filterNot(c => partitionColNames.contains(c.name))
val outputStatsCollectionSchema = dataFrameOutput
.filterNot(c => partitionColNames.contains(c.name))

// statsCollectionSchema comes from table schema
// The tableStatsCollectionSchema comes from table schema
val statsTableSchema = metadata.schema.toAttributes
val mappedStatsTableSchema = if (metadata.columnMappingMode == NoMapping) {
statsTableSchema
Expand All @@ -267,10 +269,10 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
}

// It's important to first do the column mapping and then drop the partition columns
val filteredStatsTableSchema = mappedStatsTableSchema
val tableStatsCollectionSchema = mappedStatsTableSchema
.filterNot(c => partitionColNames.contains(c.name))

(statsDataSchema, filteredStatsTableSchema)
(outputStatsCollectionSchema, tableStatsCollectionSchema)
}

protected def getStatsColExpr(
Expand All @@ -291,33 +293,33 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
Option[StatisticsCollection]) = {
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_STATS)) {

val (statsDataSchema, statsCollectionSchema) = getStatsSchema(output, partitionSchema)

val indexedCols = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)
val (outputStatsCollectionSchema, tableStatsCollectionSchema) =
getStatsSchema(output, partitionSchema)

val statsCollection = new StatisticsCollection {
override def tableDataSchema = {
// If collecting stats using the table schema, then pass in statsCollectionSchema.
// Otherwise pass in statsDataSchema to collect stats using the DataFrame schema.
if (spark.sessionState.conf.getConf(DeltaSQLConf
.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA)) {
statsCollectionSchema.toStructType
override val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
override def tableSchema: StructType = metadata.schema
override def outputTableStatsSchema: StructType = {
// If collecting stats uses the table schema, then we pass in tableStatsCollectionSchema;
// otherwise, pass in outputStatsCollectionSchema to collect stats using the DataFrame
// schema.
if (spark.sessionState.conf.getConf(DELTA_COLLECT_STATS_USING_TABLE_SCHEMA)) {
tableStatsCollectionSchema.toStructType
} else {
statsDataSchema.toStructType
outputStatsCollectionSchema.toStructType
}
}
override def dataSchema = statsDataSchema.toStructType
override def outputAttributeSchema: StructType = outputStatsCollectionSchema.toStructType
override val spark: SparkSession = data.sparkSession
override val numIndexedCols = indexedCols
override val statsColumnSpec = StatisticsCollection.configuredDeltaStatsColumnSpec(metadata)
override val protocol: Protocol = newProtocol.getOrElse(snapshot.protocol)
}

val statsColExpr = getStatsColExpr(statsDataSchema, statsCollection)
val statsColExpr = getStatsColExpr(outputStatsCollectionSchema, statsCollection)

(Some(new DeltaJobStatisticsTracker(
deltaLog.newDeltaHadoopConf(),
outputPath,
statsDataSchema,
outputStatsCollectionSchema,
statsColExpr)), Some(statsCollection))
} else {
(None, None)
Expand Down
Loading