Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

Expand Down Expand Up @@ -432,7 +432,7 @@ case class DataSource(
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumnDataTypes(
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)

def validatePartitionColumnDataTypes(
def validatePartitionColumn(
schema: StructType,
partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
Expand All @@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
}
}

if (partitionColumns.size == schema.fields.size) {
throw new AnalysisException(s"Cannot use all columns for partition columns")
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One little concern. If it is added here, should the method name be changed? After all it will do more than validating data types after the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for attention, @wangyang1992 . Good point!
Maybe, validatePartitionColumnDataTypes -> validatePartitionColumnDataTypesAndCount ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, let's change it. :)
Since PartitionUtils is private[sql], it's safe to be changed.
I'll update this PR. Thank you for your review and idea!


def partitionColumnsSchema(
Expand All @@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
val equality = columnNameEquality(caseSensitive)
StructType(partitionColumns.map { col =>
schema.find(f => equality(f.name, col)).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $schema")
throw new AnalysisException(s"Partition column $col not found in schema $schema")
}
}).asNullable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// OK
}

PartitioningUtils.validatePartitionColumnDataTypes(
PartitioningUtils.validatePartitionColumn(
r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)

// Get all input data source relations of the query.
Expand Down Expand Up @@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// OK
}

PartitioningUtils.validatePartitionColumnDataTypes(
PartitioningUtils.validatePartitionColumn(
c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class FileStreamSinkWriter(
hadoopConf: Configuration,
options: Map[String, String]) extends Serializable with Logging {

PartitioningUtils.validatePartitionColumnDataTypes(
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)

private val serializableConf = new SerializableConfiguration(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {

cq.awaitTermination(2000L)
}

test("prevent all column partitioning") {
withTempDir { dir =>
val path = dir.getCanonicalPath
intercept[AnalysisException] {
spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
}
intercept[AnalysisException] {
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is wrong, right? This exception message will be like The ORC data source must be used with Hive support enabled. To test this, we need to move this to another suite.

Will fix it in my ongoing PR. My suggestion is to always verify the error message, if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That will be fixed in #13730 .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean there is already ongoing PR.
And, thank you for advice!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine if you want to fix it. You can put it in OrcSuite. Then, running the following command to verify whether it passes or not:

build/sbt -Phive "hive/test-only org.apache.spark.sql.hive.orc.OrcSourceSuite"

  test("prevent all column partitioning") {
    withTempDir { dir =>
      val path = dir.getCanonicalPath
      val e = intercept[AnalysisException] {
        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
      }.getMessage
      assert(e.contains("Cannot use all columns for partition columns"))
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean. I'm going to exclude ORC cases in #13730 .
So, for the OrcSuite, you can do that. If then, I'll really appreciate it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to add more edge cases in the ongoing PR for improving the test coverage of DataFrameReader and DataFrameWriter. Will include this too. Thanks!

}
}
}
}