Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,15 @@ case class InsertIntoHadoopFsRelationCommand(


// update metastore partition metadata
refreshUpdatedPartitions(updatedPartitionPaths)
if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
&& partitionColumns.length == staticPartitions.size) {
// Avoid empty static partition can't loaded to datasource table.
val staticPathFragment =
PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
refreshUpdatedPartitions(Set(staticPathFragment))
} else {
refreshUpdatedPartitions(updatedPartitionPaths)
}

// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -284,6 +284,10 @@ object PartitioningUtils {
}.mkString("/")
}

def getPathFragment(spec: TablePartitionSpec, partitionColumns: Seq[Attribute]): String = {
getPathFragment(spec, StructType.fromAttributes(partitionColumns))
}

/**
* Normalize the column names in partition specification, w.r.t. the real partition column names
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,68 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("Partition table should load empty static partitions") {
// All static partitions
withTable("t", "t1", "t2") {
withTempPath { dir =>
spark.sql("CREATE TABLE t(a int) USING parquet")
spark.sql("CREATE TABLE t1(a int, c string, b string) " +
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")

// datasource table
validateStaticPartitionTable("t1")

// hive table
if (isUsingHiveMetastore) {
spark.sql("CREATE TABLE t2(a int) " +
s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'")
validateStaticPartitionTable("t2")
}

def validateStaticPartitionTable(tableName: String): Unit = {
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
spark.sql(
s"INSERT INTO TABLE $tableName PARTITION(b='b', c='c') SELECT * FROM t WHERE 1 = 0")
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 1)
assert(new File(dir, "c=c/b=b").exists())
checkAnswer(spark.table(tableName), Nil)
}
}
}

// Partial dynamic partitions
withTable("t", "t1", "t2") {
withTempPath { dir =>
spark.sql("CREATE TABLE t(a int) USING parquet")
spark.sql("CREATE TABLE t1(a int, b string, c string) " +
s"USING parquet PARTITIONED BY(c, b) LOCATION '${dir.toURI}'")

// datasource table
validatePartialStaticPartitionTable("t1")

// hive table
if (isUsingHiveMetastore) {
spark.sql("CREATE TABLE t2(a int) " +
s"PARTITIONED BY(c string, b string) LOCATION '${dir.toURI}'")
validatePartialStaticPartitionTable("t2")
}

def validatePartialStaticPartitionTable(tableName: String): Unit = {
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
spark.sql(
s"INSERT INTO TABLE $tableName PARTITION(c='c', b) SELECT *, 'b' FROM t WHERE 1 = 0")
assert(spark.sql(s"SHOW PARTITIONS $tableName").count() == 0)
assert(!new File(dir, "c=c/b=b").exists())
checkAnswer(spark.table(tableName), Nil)
}
}
}
}

Seq(true, false).foreach { shouldDelete =>
val tcName = if (shouldDelete) "non-existing" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
Expand Down