From 07088ca8db3fdc357f3c4ebf8d4d431b97f25a9e Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Wed, 10 Jul 2024 16:11:53 -0700 Subject: [PATCH 01/10] initial commit --- .../resources/error/error-conditions.json | 14 +++++ .../sql/catalyst/catalog/interface.scala | 19 ++++++ .../sql/errors/QueryCompilationErrors.scala | 30 ++++++++++ .../connector/catalog/InMemoryBaseTable.scala | 4 ++ .../apache/spark/sql/DataFrameWriter.scala | 60 +++++++++++++++++-- .../apache/spark/sql/DataFrameWriterV2.scala | 34 ++++++++++- .../datasources/DataSourceUtils.scala | 5 ++ .../sql/execution/datasources/rules.scala | 12 ++++ .../spark/sql/DataFrameWriterV2Suite.scala | 50 +++++++++++++++- .../sql/execution/command/DDLSuite.scala | 59 ++++++++++++++++++ .../command/DescribeTableSuiteBase.scala | 51 ++++++++++++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 18 +++++- 12 files changed, 347 insertions(+), 9 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a7027e89e187..e72aaaf06f35 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -471,6 +471,20 @@ ], "sqlState" : "0A000" }, + "CLUSTERING_COLUMNS_MISMATCH" : { + "message" : [ + "Specified clustering does not match that of the existing table .", + "Specified clustering columns: [].", + "Existing clustering columns: []." + ], + "sqlState" : "42P10" + }, + "CLUSTERING_NOT_SUPPORTED" : { + "message" : [ + "'' does not support clustering." + ], + "sqlState" : "42000" + }, "CODEC_NOT_AVAILABLE" : { "message" : [ "The codec is not available." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c281b0df8a6d..5030d9264bd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -209,10 +209,25 @@ object ClusterBySpec { normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson } + /** + * Converts a ClusterBySpec to a map of table properties used to store the clustering + * information in the table catalog. + * + * @param clusterBySpec : existing ClusterBySpec to be converted to properties. + */ + def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { + val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) + Map(CatalogTable.PROP_CLUSTERING_COLUMNS -> columnValue) + } + private def normalizeClusterBySpec( schema: StructType, clusterBySpec: ClusterBySpec, resolver: Resolver): ClusterBySpec = { + if (schema.isEmpty) { + return clusterBySpec + } + val normalizedColumns = clusterBySpec.columnNames.map { columnName => val position = SchemaUtils.findColumnPosition( columnName.fieldNames().toImmutableArraySeq, schema, resolver) @@ -239,6 +254,10 @@ object ClusterBySpec { val normalizedClusterBySpec = normalizeClusterBySpec(schema, clusterBySpec, resolver) ClusterByTransform(normalizedClusterBySpec.columnNames) } + + def fromColumnNames(names: Seq[String]): ClusterBySpec = { + ClusterBySpec(names.map(FieldReference(_))) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 73a98f9fe4be..75a9fdb1a6be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1866,6 +1866,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "existingBucketString" -> existingBucketString)) } + def mismatchedTableClusteringError( + tableName: String, + specifiedClusteringString: String, + existingClusteringString: String): Throwable = { + new AnalysisException( + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + messageParameters = Map( + "tableName" -> tableName, + "specifiedClusteringString" -> specifiedClusteringString, + "existingClusteringString" -> existingClusteringString)) + } + def specifyPartitionNotAllowedWhenTableSchemaNotDefinedError(): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1165", @@ -4108,4 +4120,22 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("functionName" -> functionName) ) } + + def operationNotSupportClusteringError(operation: String): Throwable = { + new AnalysisException( + errorClass = "CLUSTERING_NOT_SUPPORTED", + messageParameters = Map("operation" -> operation)) + } + + def clusterByWithPartitionedBy(): Throwable = { + new AnalysisException( + errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", + messageParameters = Map.empty) + } + + def clusterByWithBucketing(): Throwable = { + new AnalysisException( + errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED", + messageParameters = Map.empty) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 505a5a616920..852e39931626 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -194,6 +194,10 @@ abstract class InMemoryBaseTable( case (v, t) => throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)") } + case ClusterByTransform(columnNames) => + columnNames.map { colName => + extractor(colName.fieldNames, cleanedSchema, row)._1 + } }.toImmutableArraySeq } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d6d5f0e8b2b..82a04a96b09e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.DDLUtils @@ -193,6 +193,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { @scala.annotation.varargs def partitionBy(colNames: String*): DataFrameWriter[T] = { this.partitioningColumns = Option(colNames) + validatePartitioning() this } @@ -210,6 +211,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T] = { this.numBuckets = Option(numBuckets) this.bucketColumnNames = Option(colName +: colNames) + validatePartitioning() + this + } + + /** + * Clusters the data by the given columns to optimize query performance. + * + * @since 4.0 + */ + @scala.annotation.varargs + def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = { + this.clusteringColumns = Option(colName +: colNames) + validatePartitioning() this } @@ -377,6 +391,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { DataSourceUtils.PARTITIONING_COLUMNS_KEY -> DataSourceUtils.encodePartitioningColumns(columns)) } + clusteringColumns.foreach { columns => + extraOptions = extraOptions + ( + DataSourceUtils.CLUSTERING_COLUMNS_KEY -> + DataSourceUtils.encodePartitioningColumns(columns)) + } val optionsWithPath = getOptionsWithPath(path) @@ -515,6 +534,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } + private def assertNotClustered(operation: String): Unit = { + if (clusteringColumns.isDefined) { + throw QueryCompilationErrors.operationNotSupportClusteringError(operation) + } + } + /** * Saves the content of the `DataFrame` as the specified table. * @@ -688,6 +713,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { CatalogTableType.MANAGED } + val properties = if (clusteringColumns.isEmpty) { + Map.empty[String, String] + } else { + ClusterBySpec.toProperties(ClusterBySpec.fromColumnNames(clusteringColumns.get)) + } + val tableDesc = CatalogTable( identifier = tableIdent, tableType = tableType, @@ -695,7 +726,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { schema = new StructType, provider = Some(source), partitionColumnNames = partitioningColumns.getOrElse(Nil), - bucketSpec = getBucketSpec) + bucketSpec = getBucketSpec, + properties = properties) runCommand(df.sparkSession)( CreateTable(tableDesc, mode, Some(df.logicalPlan))) @@ -708,7 +740,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { }.getOrElse(Seq.empty[Transform]) val bucketing = getBucketSpec.map(spec => CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq - partitioning ++ bucketing + val clustering = clusteringColumns.map { colNames => + ClusterByTransform(colNames.map(col => FieldReference(col))) + } + partitioning ++ bucketing ++ clustering } /** @@ -719,11 +754,25 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val v2Partitions = partitioningAsV2 if (v2Partitions.isEmpty) return require(v2Partitions.sameElements(existingTable.partitioning()), - "The provided partitioning does not match of the table.\n" + + "The provided partitioning or clustering columns do not match the existing table's.\n" + s" - provided: ${v2Partitions.mkString(", ")}\n" + s" - table: ${existingTable.partitioning().mkString(", ")}") } + /** + * Validate that clusterBy is not used with partitionBy or bucketBy. + */ + private def validatePartitioning(): Unit = { + if (clusteringColumns.nonEmpty) { + if (partitioningColumns.nonEmpty) { + throw QueryCompilationErrors.clusterByWithPartitionedBy() + } + if (getBucketSpec.nonEmpty) { + throw QueryCompilationErrors.clusterByWithBucketing() + } + } + } + /** * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the * table already exists in the external database, behavior of this function depends on the @@ -750,6 +799,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") + assertNotClustered("jdbc") // connectionProperties should override settings in extraOptions. this.extraOptions ++= connectionProperties.asScala // explicit url and dbtable should override all @@ -916,5 +966,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private var numBuckets: Option[Int] = None + private var clusteringColumns: Option[Seq[String]] = None + private var sortColumnNames: Option[Seq[String]] = None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index b68a13ba2159..c970d0989e62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -24,7 +24,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} -import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types.IntegerType @@ -54,6 +54,8 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) private var partitioning: Option[Seq[Transform]] = None + private var clustering: Option[ClusterByTransform] = None + override def using(provider: String): CreateTableWriter[T] = { this.provider = Some(provider) this @@ -104,9 +106,27 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } this.partitioning = Some(asTransforms) + validatePartitioning() this } + @scala.annotation.varargs + override def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] = { + this.clustering = + Some(ClusterByTransform((colName +: colNames).map(col => FieldReference(col)))) + validatePartitioning() + this + } + + /** + * Validate that clusterBy is not used with partitionBy. + */ + private def validatePartitioning(): Unit = { + if (partitioning.nonEmpty && clustering.nonEmpty) { + throw QueryCompilationErrors.clusterByWithPartitionedBy() + } + } + override def create(): Unit = { val tableSpec = UnresolvedTableSpec( properties = properties.toMap, @@ -119,7 +139,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) runCommand( CreateTableAsSelect( UnresolvedIdentifier(tableName), - partitioning.getOrElse(Seq.empty), + partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, tableSpec, options.toMap, @@ -207,7 +227,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) external = false) runCommand(ReplaceTableAsSelect( UnresolvedIdentifier(tableName), - partitioning.getOrElse(Seq.empty), + partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, tableSpec, writeOptions = options.toMap, @@ -330,6 +350,14 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { */ def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] + /** + * Cluster the output table created by `create`, `createOrReplace`, or `replace` using + * the given columns. + * + * @since 4.0.0 + */ + def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] + /** * Specifies a provider for the underlying output data source. Spark's default catalog supports * "parquet", "json", etc. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index c80dc8307967..81eadcc263c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -52,6 +52,11 @@ object DataSourceUtils extends PredicateHelper { */ val PARTITION_OVERWRITE_MODE = "partitionOverwriteMode" + /** + * The key to use for storing clusterBy columns as options. + */ + val CLUSTERING_COLUMNS_KEY = "__clustering_columns" + /** * Utility methods for converting partitionBy columns to options and back. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index e4c3cd20dedb..80846f198620 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -198,6 +198,18 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical tableName, specifiedBucketString, existingBucketString) } + // Check if the specified clustering columns match the existing table. + val specifiedClusterBySpec = tableDesc.clusterBySpec + val existingClusterBySpec = existingTable.clusterBySpec + if (specifiedClusterBySpec != existingClusterBySpec) { + val specifiedClusteringString = + specifiedClusterBySpec.map(_.toString).getOrElse("") + val existingClusteringString = + existingClusterBySpec.map(_.toString).getOrElse("") + throw QueryCompilationErrors.mismatchedTableClusteringError( + tableName, specifiedClusteringString, existingClusteringString) + } + val newQuery = if (adjustedColumns != query.output) { Project(adjustedColumns, query) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 44d47abc93fa..2275d8c21397 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove import org.apache.spark.sql.connector.InMemoryV1Provider import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} +import org.apache.spark.sql.connector.expressions.{BucketTransform, ClusterByTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.MemoryStream @@ -524,6 +524,18 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo Seq(BucketTransform(LiteralValue(4, IntegerType), Seq(FieldReference("id"))))) } + test("Create: cluster by") { + spark.table("source") + .writeTo("testcat.table_name") + .clusterBy("id") + .create() + + val table = catalog("testcat").loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name === "testcat.table_name") + assert(table.partitioning === Seq(ClusterByTransform(Seq(FieldReference("id"))))) + } + test("Create: fail if table already exists") { spark.sql( "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)") @@ -634,6 +646,42 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(replaced.properties === defaultOwnership.asJava) } + test("Replace: clustered table") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.sql("INSERT INTO TABLE testcat.table_name SELECT * FROM source") + + checkAnswer( + spark.table("testcat.table_name"), + Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"))) + + val table = catalog("testcat").loadTable(Identifier.of(Array(), "table_name")) + + // validate the initial table + assert(table.name === "testcat.table_name") + assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) + assert(table.partitioning.isEmpty) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) + + spark.table("source2") + .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) + .writeTo("testcat.table_name").clusterBy("id").replace() + + checkAnswer( + spark.table("testcat.table_name"), + Seq(Row(4L, "d", "even"), Row(5L, "e", "odd"), Row(6L, "f", "even"))) + + val replaced = catalog("testcat").loadTable(Identifier.of(Array(), "table_name")) + + // validate the replacement table + assert(replaced.name === "testcat.table_name") + assert(replaced.schema === new StructType() + .add("id", LongType) + .add("data", StringType) + .add("even_or_odd", StringType)) + assert(replaced.partitioning === Seq(ClusterByTransform(Seq(FieldReference("id"))))) + assert(replaced.properties === defaultOwnership.asJava) + } + test("Replace: fail if table does not exist") { val exc = intercept[CannotReplaceMissingTableException] { spark.table("source").writeTo("testcat.table_name").replace() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 994c420feae1..1bee3bf5c274 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1373,6 +1373,65 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { } } + test("Clustering columns should match when appending to existing data source tables") { + import testImplicits._ + val df = Seq((1, 2, 3)).toDF("a", "b", "c") + withTable("clusteredTable") { + df.write.mode("overwrite").clusterBy("a", "b").saveAsTable("clusteredTable") + // Misses some clustering columns + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").clusterBy("a").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> """[["a"]]""", + "existingClusteringString" -> """[["a"],["b"]]""") + ) + // Wrong order + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").clusterBy("b", "a").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> """[["b"],["a"]]""", + "existingClusteringString" -> """[["a"],["b"]]""") + ) + // Clustering columns not specified + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> "", "existingClusteringString" -> """[["a"],["b"]]""") + ) + assert(sql("select * from clusteredTable").collect().length == 1) + // Inserts new data successfully when clustering columns are correctly specified in + // clusterBy(...). + Seq((4, 5, 6)).toDF("a", "b", "c") + .write + .mode("append") + .clusterBy("a", "b") + .saveAsTable("clusteredTable") + + Seq((7, 8, 9)).toDF("a", "b", "c") + .write + .mode("append") + .clusterBy("a", "b") + .saveAsTable("clusteredTable") + + checkAnswer( + sql("select a, b, c from clusteredTable"), + Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil + ) + } + } + test("show columns - negative test") { // When case sensitivity is true, the user supplied database name in table identifier // should match the supplied database name in case sensitive way. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 02e8a5e68999..c4e9ff93ef85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} /** @@ -242,4 +243,54 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { Row("# col_name", "data_type", "comment"))) } } + + test("describe a clustered table - dataframe writer v1") { + withNamespaceAndTable("ns", "tbl") { tbl => + val df = spark.range(10).select( + col("id").cast("string").as("col1"), + struct(col("id").cast("int").as("x"), col("id").cast("int").as("y")).as("col2")) + df.write.mode("append").clusterBy("col1", "col2.x").saveAsTable(tbl) + val descriptionDf = sql(s"DESC $tbl") + + descriptionDf.show(false) + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf, + Seq( + Row("col1", "string", null), + Row("col2", "struct", null), + Row("# Clustering Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("col2.x", "int", null), + Row("col1", "string", null))) + } + } + + test("describe a clustered table - dataframe writer v2") { + withNamespaceAndTable("ns", "tbl") { tbl => + val df = spark.range(10).select( + col("id").cast("string").as("col1"), + struct(col("id").cast("int").as("x"), col("id").cast("int").as("y")).as("col2")) + df.writeTo(tbl).clusterBy("col1", "col2.x").create() + val descriptionDf = sql(s"DESC $tbl") + + descriptionDf.show(false) + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf, + Seq( + Row("col1", "string", null), + Row("col2", "struct", null), + Row("# Clustering Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("col2.x", "int", null), + Row("col1", "string", null))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 603ee74ce333..9f573381f324 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -286,6 +286,16 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2")) } + test("pass clusterBy as options") { + Seq(1).toDF().write + .format("org.apache.spark.sql.test") + .clusterBy("col1", "col2") + .save() + + val clusteringColumns = LastOptions.parameters(DataSourceUtils.CLUSTERING_COLUMNS_KEY) + assert(DataSourceUtils.decodePartitioningColumns(clusteringColumns) === Seq("col1", "col2")) + } + test ("SPARK-29537: throw exception when user defined a wrong base path") { withTempPath { p => val path = new Path(p.toURI).toString @@ -490,7 +500,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(LastOptions.parameters("doubleOpt") == "6.7") } - test("check jdbc() does not support partitioning, bucketBy or sortBy") { + test("check jdbc() does not support partitioning, bucketBy, clusterBy or sortBy") { val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath) var w = df.write.partitionBy("value") @@ -505,6 +515,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) } + w = df.write.clusterBy("value") + e = intercept[AnalysisException](w.jdbc(null, null, null)) + Seq("jdbc", "clustering").foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + w = df.write.sortBy("value") e = intercept[AnalysisException](w.jdbc(null, null, null)) Seq("sortBy must be used together with bucketBy").foreach { s => From d60728d2d16ff00435ef989ea66a6f055eedb0ec Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Thu, 11 Jul 2024 09:33:53 -0700 Subject: [PATCH 02/10] fix mima --- project/MimaExcludes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8cf872e4dd0f..c126d12b1473 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -100,7 +100,9 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext#implicits._sqlContext"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits._sqlContext"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.session"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SparkSession#implicits._sqlContext") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SparkSession#implicits._sqlContext"), + // SPARK-48761: Add clusterBy() to CreateTableWriter. + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.CreateTableWriter.clusterBy") ) // Default exclude rules From 2d288c63b49d5333c0ea7a9327989758f2fa90d2 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Thu, 11 Jul 2024 09:48:43 -0700 Subject: [PATCH 03/10] include spark connect --- .../apache/spark/sql/DataFrameWriter.scala | 19 ++++++++++++ .../apache/spark/sql/DataFrameWriterV2.scala | 21 +++++++++++++ .../apache/spark/sql/ClientDatasetSuite.scala | 4 +++ .../apache/spark/sql/DataFrameWriter.scala | 31 +++++++++++-------- .../apache/spark/sql/DataFrameWriterV2.scala | 8 +++-- 5 files changed, 68 insertions(+), 15 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 563a9865e73f..1770b7bfcfa8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { this } + /** + * Clusters the output by the given columns on the file system. The rows with matching values in + * the specified clustering columns will be consolidated within the same file. + * + * For instance, if you cluster a dataset by date, the data sharing the same date will be stored + * together in a file. This arrangement improves query efficiency when you apply selective + * filters to these clustering columns, thanks to data skipping. + * + * @since 4.0.0 + */ + @scala.annotation.varargs + def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = { + this.clusteringColumns = Option(colName +: colNames) + this + } + /** * Saves the content of the `DataFrame` at the specified path. * @@ -242,6 +258,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { source.foreach(builder.setSource) sortColumnNames.foreach(names => builder.addAllSortColumnNames(names.asJava)) partitioningColumns.foreach(cols => builder.addAllPartitioningColumns(cols.asJava)) + clusteringColumns.foreach(cols => builder.addAllClusteringColumns(cols.asJava)) numBuckets.foreach(n => { val bucketBuilder = proto.WriteOperation.BucketBy.newBuilder() @@ -515,4 +532,6 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { private var numBuckets: Option[Int] = None private var sortColumnNames: Option[Seq[String]] = None + + private var clusteringColumns: Option[Seq[String]] = None } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 7107895c0ad2..8ee0630b6d20 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -41,6 +41,8 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) private var partitioning: Option[Seq[proto.Expression]] = None + private var clustering: Option[Seq[String]] = None + private var overwriteCondition: Option[proto.Expression] = None override def using(provider: String): CreateTableWriter[T] = { @@ -77,6 +79,12 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) this } + @scala.annotation.varargs + override def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] = { + this.clustering = Some(colName +: colNames) + this + } + override def create(): Unit = { executeWriteOperation(proto.WriteOperationV2.Mode.MODE_CREATE) } @@ -145,6 +153,7 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) provider.foreach(builder.setProvider) partitioning.foreach(columns => builder.addAllPartitioningColumns(columns.asJava)) + clustering.foreach(columns => builder.addAllClusteringColumns(columns.asJava)) options.foreach { case (k, v) => builder.putOptions(k, v) @@ -274,6 +283,18 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { */ def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] + /** + * Clusters the output by the given columns on the file system. The rows with matching values in + * the specified clustering columns will be consolidated within the same file. + * + * For instance, if you cluster a dataset by date, the data sharing the same date will be stored + * together in a file. This arrangement improves query efficiency when you apply selective + * filters to these clustering columns, thanks to data skipping. + * + * @since 4.0.0 + */ + def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] + /** * Specifies a provider for the underlying output data source. Spark's default catalog supports * "parquet", "json", etc. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala index 9d6f07cf603a..c69cbcf6332e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala @@ -85,6 +85,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach { .setNumBuckets(2) .addBucketColumnNames("col1") .addBucketColumnNames("col2")) + .addClusteringColumns("col3") val expectedPlan = proto.Plan .newBuilder() @@ -95,6 +96,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach { .sortBy("col1") .partitionBy("col99") .bucketBy(2, "col1", "col2") + .clusterBy("col3") .parquet("my/test/path") val actualPlan = service.getAndClearLatestInputPlan() assert(actualPlan.equals(expectedPlan)) @@ -136,6 +138,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach { .setTableName("t1") .addPartitioningColumns(col("col99").expr) .setProvider("json") + .addClusteringColumns("col3") .putTableProperties("key", "value") .putOptions("key2", "value2") .setMode(proto.WriteOperationV2.Mode.MODE_CREATE_OR_REPLACE) @@ -147,6 +150,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach { df.writeTo("t1") .partitionedBy(col("col99")) + .clusterBy("col3") .using("json") .tableProperty("key", "value") .options(Map("key2" -> "value2")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 82a04a96b09e..12eaf04ebc04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -216,28 +216,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } /** - * Clusters the data by the given columns to optimize query performance. + * Sorts the output in each bucket by the given columns. * - * @since 4.0 + * This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark + * 2.1.0. + * + * @since 2.0 */ @scala.annotation.varargs - def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = { - this.clusteringColumns = Option(colName +: colNames) - validatePartitioning() + def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = { + this.sortColumnNames = Option(colName +: colNames) this } /** - * Sorts the output in each bucket by the given columns. + * Clusters the output by the given columns on the file system. The rows with matching values in + * the specified clustering columns will be consolidated within the same file. * - * This is applicable for all file-based data sources (e.g. Parquet, JSON) starting with Spark - * 2.1.0. + * For instance, if you cluster a dataset by date, the data sharing the same date will be stored + * together in a file. This arrangement improves query efficiency when you apply selective + * filters to these clustering columns, thanks to data skipping. * - * @since 2.0 + * @since 4.0 */ @scala.annotation.varargs - def sortBy(colName: String, colNames: String*): DataFrameWriter[T] = { - this.sortColumnNames = Option(colName +: colNames) + def clusterBy(colName: String, colNames: String*): DataFrameWriter[T] = { + this.clusteringColumns = Option(colName +: colNames) + validatePartitioning() this } @@ -966,7 +971,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private var numBuckets: Option[Int] = None - private var clusteringColumns: Option[Seq[String]] = None - private var sortColumnNames: Option[Seq[String]] = None + + private var clusteringColumns: Option[Seq[String]] = None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index c970d0989e62..65c0748153f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -351,8 +351,12 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] /** - * Cluster the output table created by `create`, `createOrReplace`, or `replace` using - * the given columns. + * Clusters the output by the given columns on the file system. The rows with matching values in + * the specified clustering columns will be consolidated within the same file. + * + * For instance, if you cluster a dataset by date, the data sharing the same date will be stored + * together in a file. This arrangement improves query efficiency when you apply selective + * filters to these clustering columns, thanks to data skipping. * * @since 4.0.0 */ From 9e155b0fde32fd2e0fced5bf5cf5cc676b95208a Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Thu, 11 Jul 2024 14:37:36 -0700 Subject: [PATCH 04/10] add missing piece --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 4702f09a14c2..d5da794c712f 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3072,6 +3072,11 @@ class SparkConnectPlanner( w.partitionBy(names.toSeq: _*) } + if (writeOperation.getClusteringColumnsCount > 0) { + val names = writeOperation.getClusteringColumnsList.asScala + w.clusterBy(names.toSeq: _*) + } + if (writeOperation.hasSource) { w.format(writeOperation.getSource) } @@ -3188,6 +3193,10 @@ class SparkConnectPlanner( writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*) } + if (writeOp.getClusteringColumnNamesCount > 0) { + writer.clusterBy(writeOp.getClusteringColumnNamesList.asScala.toList: _*) + } + writeOp.getTriggerCase match { case TriggerCase.PROCESSING_TIME_INTERVAL => writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval)) From b5988f2fe058c61e35606fe2fbe73f0260f1b3d4 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Mon, 15 Jul 2024 14:41:49 -0700 Subject: [PATCH 05/10] Fix test --- .../connect/planner/SparkConnectPlanner.scala | 11 ++--- .../spark/sql/connect/dsl/package.scala | 4 ++ .../planner/SparkConnectProtoSuite.scala | 42 +++++++++++++++++++ .../sql/execution/datasources/rules.scala | 7 +++- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index d5da794c712f..0bb3c3a6ecb8 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3074,7 +3074,7 @@ class SparkConnectPlanner( if (writeOperation.getClusteringColumnsCount > 0) { val names = writeOperation.getClusteringColumnsList.asScala - w.clusterBy(names.toSeq: _*) + w.clusterBy(names.head, names.tail.toSeq: _*) } if (writeOperation.hasSource) { @@ -3140,6 +3140,11 @@ class SparkConnectPlanner( w.partitionedBy(names.head, names.tail: _*) } + if (writeOperation.getClusteringColumnsCount > 0) { + val names = writeOperation.getClusteringColumnsList.asScala + w.clusterBy(names.head, names.tail.toSeq: _*) + } + writeOperation.getMode match { case proto.WriteOperationV2.Mode.MODE_CREATE => if (writeOperation.hasProvider) { @@ -3193,10 +3198,6 @@ class SparkConnectPlanner( writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*) } - if (writeOp.getClusteringColumnNamesCount > 0) { - writer.clusterBy(writeOp.getClusteringColumnNamesList.asScala.toList: _*) - } - writeOp.getTriggerCase match { case TriggerCase.PROCESSING_TIME_INTERVAL => writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval)) diff --git a/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala b/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala index 3edb63ee8e81..fdbfc39cc9a5 100644 --- a/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -219,6 +219,7 @@ package object dsl { mode: Option[String] = None, sortByColumns: Seq[String] = Seq.empty, partitionByCols: Seq[String] = Seq.empty, + clusterByCols: Seq[String] = Seq.empty, bucketByCols: Seq[String] = Seq.empty, numBuckets: Option[Int] = None): Command = { val writeOp = WriteOperation.newBuilder() @@ -242,6 +243,7 @@ package object dsl { } sortByColumns.foreach(writeOp.addSortColumnNames(_)) partitionByCols.foreach(writeOp.addPartitioningColumns(_)) + clusterByCols.foreach(writeOp.addClusteringColumns(_)) if (numBuckets.nonEmpty && bucketByCols.nonEmpty) { val op = WriteOperation.BucketBy.newBuilder() @@ -272,6 +274,7 @@ package object dsl { options: Map[String, String] = Map.empty, tableProperties: Map[String, String] = Map.empty, partitionByCols: Seq[Expression] = Seq.empty, + clusterByCols: Seq[String] = Seq.empty, mode: Option[String] = None, overwriteCondition: Option[Expression] = None): Command = { val writeOp = WriteOperationV2.newBuilder() @@ -279,6 +282,7 @@ package object dsl { tableName.foreach(writeOp.setTableName) provider.foreach(writeOp.setProvider) partitionByCols.foreach(writeOp.addPartitioningColumns) + clusterByCols.foreach(writeOp.addClusteringColumns) options.foreach { case (k, v) => writeOp.putOptions(k, v) } diff --git a/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index 6721555220fe..190f8cde16f5 100644 --- a/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -596,6 +596,48 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { } } + test("Write with clustering") { + // Cluster by existing column. + withTable("testtable") { + transform( + localRelation.write( + tableName = Some("testtable"), + tableSaveMethod = Some("save_as_table"), + format = Some("parquet"), + clusterByCols = Seq("id"))) + } + + // Cluster by non-existing column. + assertThrows[AnalysisException]( + transform( + localRelation + .write( + tableName = Some("testtable"), + tableSaveMethod = Some("save_as_table"), + format = Some("parquet"), + clusterByCols = Seq("noid")))) + } + + test("Write V2 with clustering") { + // Cluster by existing column. + withTable("testtable") { + transform( + localRelation.writeV2( + tableName = Some("testtable"), + mode = Some("MODE_CREATE"), + clusterByCols = Seq("id"))) + } + + // Cluster by non-existing column. + assertThrows[AnalysisException]( + transform( + localRelation + .writeV2( + tableName = Some("testtable"), + mode = Some("MODE_CREATE"), + clusterByCols = Seq("noid")))) + } + test("Write with invalid bucketBy configuration") { val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0)) assertThrows[InvalidCommandInput] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 80846f198620..37ccf54d932b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -331,7 +331,12 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical } } - table.copy(partitionColumnNames = normalizedPartCols, bucketSpec = normalizedBucketSpec) + val normalizedProperties = table.properties ++ table.clusterBySpec.map { spec => + ClusterBySpec.toProperty(schema, spec, conf.resolver) + } + + table.copy(partitionColumnNames = normalizedPartCols, bucketSpec = normalizedBucketSpec, + properties = normalizedProperties) } private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = { From 32563eaa52db06f0191a162570f73d613f54a214 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Mon, 22 Jul 2024 08:54:21 -0700 Subject: [PATCH 06/10] address comments --- .../org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../spark/sql/catalyst/catalog/interface.scala | 15 ++++++++++++++- .../org/apache/spark/sql/DataFrameWriterV2.scala | 4 ++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1770b7bfcfa8..f775a18ec710 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -202,8 +202,8 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { } /** - * Clusters the output by the given columns on the file system. The rows with matching values in - * the specified clustering columns will be consolidated within the same file. + * Clusters the output by the given columns on the storage. The rows with matching values in + * the specified clustering columns will be consolidated within the same group. * * For instance, if you cluster a dataset by date, the data sharing the same date will be stored * together in a file. This arrangement improves query efficiency when you apply selective diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5030d9264bd1..bea5e247148a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -197,10 +197,22 @@ object ClusterBySpec { ret } + /** + * Converts the clustering column property to a ClusterBySpec. + */ def fromProperty(columns: String): ClusterBySpec = { ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_))) } + /** + * Converts a ClusterBySpec to a clustering column property map entry, with validation + * of the column names against the schema. + * + * @param schema the schema of the table. + * @param clusterBySpec the ClusterBySpec to be converted to a property. + * @param resolver the resolver used to match the column names. + * @return a map entry for the clustering column property. + */ def toProperty( schema: StructType, clusterBySpec: ClusterBySpec, @@ -213,7 +225,8 @@ object ClusterBySpec { * Converts a ClusterBySpec to a map of table properties used to store the clustering * information in the table catalog. * - * @param clusterBySpec : existing ClusterBySpec to be converted to properties. + * @param clusterBySpec existing ClusterBySpec to be converted to properties. + * @return Map of properties to be stored in the CatalogTable. */ def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 65c0748153f0..eef21fefe94e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -351,8 +351,8 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] /** - * Clusters the output by the given columns on the file system. The rows with matching values in - * the specified clustering columns will be consolidated within the same file. + * Clusters the output by the given columns on the storage. The rows with matching values in + * the specified clustering columns will be consolidated within the same group. * * For instance, if you cluster a dataset by date, the data sharing the same date will be stored * together in a file. This arrangement improves query efficiency when you apply selective From 35412d9dd382261d85efdecbde9be946ed1fcb0b Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Mon, 22 Jul 2024 12:46:48 -0700 Subject: [PATCH 07/10] scalafmt --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f775a18ec710..616bc8151396 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -202,8 +202,8 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { } /** - * Clusters the output by the given columns on the storage. The rows with matching values in - * the specified clustering columns will be consolidated within the same group. + * Clusters the output by the given columns on the storage. The rows with matching values in the + * specified clustering columns will be consolidated within the same group. * * For instance, if you cluster a dataset by date, the data sharing the same date will be stored * together in a file. This arrangement improves query efficiency when you apply selective From 9ff7cab4aee1848005ea1793fb247a9405105e5a Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Mon, 22 Jul 2024 23:20:58 -0700 Subject: [PATCH 08/10] update comments --- .../main/scala/org/apache/spark/sql/DataFrameWriterV2.scala | 6 ++++-- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../main/scala/org/apache/spark/sql/DataFrameWriterV2.scala | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 8ee0630b6d20..cb7e1f13bd01 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -281,11 +281,12 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { * * @since 3.4.0 */ + @scala.annotation.varargs def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] /** - * Clusters the output by the given columns on the file system. The rows with matching values in - * the specified clustering columns will be consolidated within the same file. + * Clusters the output by the given columns on the storage. The rows with matching values in the + * specified clustering columns will be consolidated within the same group. * * For instance, if you cluster a dataset by date, the data sharing the same date will be stored * together in a file. This arrangement improves query efficiency when you apply selective @@ -293,6 +294,7 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { * * @since 4.0.0 */ + @scala.annotation.varargs def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12eaf04ebc04..6b7a5404fa51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -230,8 +230,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } /** - * Clusters the output by the given columns on the file system. The rows with matching values in - * the specified clustering columns will be consolidated within the same file. + * Clusters the output by the given columns on the storage. The rows with matching values in the + * specified clustering columns will be consolidated within the same group. * * For instance, if you cluster a dataset by date, the data sharing the same date will be stored * together in a file. This arrangement improves query efficiency when you apply selective diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index eef21fefe94e..df1f8b5c6dfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -348,6 +348,7 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { * * @since 3.0.0 */ + @scala.annotation.varargs def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] /** @@ -360,6 +361,7 @@ trait CreateTableWriter[T] extends WriteConfigMethods[CreateTableWriter[T]] { * * @since 4.0.0 */ + @scala.annotation.varargs def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] /** From 6599fa28a2fe0b7fa9b20b719667cd686ff3f826 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Tue, 23 Jul 2024 07:46:47 -0700 Subject: [PATCH 09/10] address comment --- .../apache/spark/sql/catalyst/catalog/interface.scala | 10 +++++----- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index bea5e247148a..c9dabb41b531 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -222,15 +222,15 @@ object ClusterBySpec { } /** - * Converts a ClusterBySpec to a map of table properties used to store the clustering - * information in the table catalog. + * Converts a ClusterBySpec to a clustering column property map entry, without validating + * the column names against the schema. * * @param clusterBySpec existing ClusterBySpec to be converted to properties. - * @return Map of properties to be stored in the CatalogTable. + * @return a map entry for the clustering column property. */ - def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { + def toPropertyWithoutValidation(clusterBySpec: ClusterBySpec): (String, String) = { val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) - Map(CatalogTable.PROP_CLUSTERING_COLUMNS -> columnValue) + (CatalogTable.PROP_CLUSTERING_COLUMNS -> columnValue) } private def normalizeClusterBySpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6b7a5404fa51..e79af54e0bfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -721,7 +721,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val properties = if (clusteringColumns.isEmpty) { Map.empty[String, String] } else { - ClusterBySpec.toProperties(ClusterBySpec.fromColumnNames(clusteringColumns.get)) + Map(ClusterBySpec.toPropertyWithoutValidation( + ClusterBySpec.fromColumnNames(clusteringColumns.get))) } val tableDesc = CatalogTable( From 8ab9e60d4f3a4710f42d9fbbe306714ee37308bb Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Wed, 24 Jul 2024 08:50:17 -0700 Subject: [PATCH 10/10] address comments --- .../sql/catalyst/catalog/interface.scala | 3 +- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 59 ------------------- .../sql/test/DataFrameReaderWriterSuite.scala | 59 +++++++++++++++++++ 4 files changed, 61 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c9dabb41b531..dcd1d3137da3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -229,8 +229,7 @@ object ClusterBySpec { * @return a map entry for the clustering column property. */ def toPropertyWithoutValidation(clusterBySpec: ClusterBySpec): (String, String) = { - val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) - (CatalogTable.PROP_CLUSTERING_COLUMNS -> columnValue) + (CatalogTable.PROP_CLUSTERING_COLUMNS -> clusterBySpec.toJson) } private def normalizeClusterBySpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e79af54e0bfe..991487170f17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -747,7 +747,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val bucketing = getBucketSpec.map(spec => CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq val clustering = clusteringColumns.map { colNames => - ClusterByTransform(colNames.map(col => FieldReference(col))) + ClusterByTransform(colNames.map(FieldReference(_))) } partitioning ++ bucketing ++ clustering } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 1bee3bf5c274..994c420feae1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1373,65 +1373,6 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { } } - test("Clustering columns should match when appending to existing data source tables") { - import testImplicits._ - val df = Seq((1, 2, 3)).toDF("a", "b", "c") - withTable("clusteredTable") { - df.write.mode("overwrite").clusterBy("a", "b").saveAsTable("clusteredTable") - // Misses some clustering columns - checkError( - exception = intercept[AnalysisException] { - df.write.mode("append").clusterBy("a").saveAsTable("clusteredTable") - }, - errorClass = "CLUSTERING_COLUMNS_MISMATCH", - parameters = Map( - "tableName" -> "spark_catalog.default.clusteredtable", - "specifiedClusteringString" -> """[["a"]]""", - "existingClusteringString" -> """[["a"],["b"]]""") - ) - // Wrong order - checkError( - exception = intercept[AnalysisException] { - df.write.mode("append").clusterBy("b", "a").saveAsTable("clusteredTable") - }, - errorClass = "CLUSTERING_COLUMNS_MISMATCH", - parameters = Map( - "tableName" -> "spark_catalog.default.clusteredtable", - "specifiedClusteringString" -> """[["b"],["a"]]""", - "existingClusteringString" -> """[["a"],["b"]]""") - ) - // Clustering columns not specified - checkError( - exception = intercept[AnalysisException] { - df.write.mode("append").saveAsTable("clusteredTable") - }, - errorClass = "CLUSTERING_COLUMNS_MISMATCH", - parameters = Map( - "tableName" -> "spark_catalog.default.clusteredtable", - "specifiedClusteringString" -> "", "existingClusteringString" -> """[["a"],["b"]]""") - ) - assert(sql("select * from clusteredTable").collect().length == 1) - // Inserts new data successfully when clustering columns are correctly specified in - // clusterBy(...). - Seq((4, 5, 6)).toDF("a", "b", "c") - .write - .mode("append") - .clusterBy("a", "b") - .saveAsTable("clusteredTable") - - Seq((7, 8, 9)).toDF("a", "b", "c") - .write - .mode("append") - .clusterBy("a", "b") - .saveAsTable("clusteredTable") - - checkAnswer( - sql("select a, b, c from clusteredTable"), - Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil - ) - } - } - test("show columns - negative test") { // When case sensitivity is true, the user supplied database name in table identifier // should match the supplied database name in case sensitive way. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 9f573381f324..377c422b22ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -296,6 +296,65 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(DataSourceUtils.decodePartitioningColumns(clusteringColumns) === Seq("col1", "col2")) } + test("Clustering columns should match when appending to existing data source tables") { + import testImplicits._ + val df = Seq((1, 2, 3)).toDF("a", "b", "c") + withTable("clusteredTable") { + df.write.mode("overwrite").clusterBy("a", "b").saveAsTable("clusteredTable") + // Misses some clustering columns + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").clusterBy("a").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> """[["a"]]""", + "existingClusteringString" -> """[["a"],["b"]]""") + ) + // Wrong order + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").clusterBy("b", "a").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> """[["b"],["a"]]""", + "existingClusteringString" -> """[["a"],["b"]]""") + ) + // Clustering columns not specified + checkError( + exception = intercept[AnalysisException] { + df.write.mode("append").saveAsTable("clusteredTable") + }, + errorClass = "CLUSTERING_COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.clusteredtable", + "specifiedClusteringString" -> "", "existingClusteringString" -> """[["a"],["b"]]""") + ) + assert(sql("select * from clusteredTable").collect().length == 1) + // Inserts new data successfully when clustering columns are correctly specified in + // clusterBy(...). + Seq((4, 5, 6)).toDF("a", "b", "c") + .write + .mode("append") + .clusterBy("a", "b") + .saveAsTable("clusteredTable") + + Seq((7, 8, 9)).toDF("a", "b", "c") + .write + .mode("append") + .clusterBy("a", "b") + .saveAsTable("clusteredTable") + + checkAnswer( + sql("select a, b, c from clusteredTable"), + Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil + ) + } + } + test ("SPARK-29537: throw exception when user defined a wrong base path") { withTempPath { p => val path = new Path(p.toURI).toString