From 826dee6a401b1eb19fa197271b13f3f594b97c6a Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Wed, 23 Nov 2016 13:50:14 -0800 Subject: [PATCH 01/12] [SPARK-18572][SQL] Add a method `listPartitionName` to `ExternalCatalog` and an implementation in `HiveExternalCatalog` that calls the Hive `getPartitionNames` method. --- .../catalyst/catalog/ExternalCatalog.scala | 32 +++++++++++++++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 17 ++++++++++ .../spark/sql/execution/command/tables.scala | 12 +------ .../datasources/PartitioningUtils.scala | 13 ++++++-- .../spark/sql/hive/HiveExternalCatalog.scala | 24 ++++++++++++++ .../spark/sql/hive/client/HiveClient.scala | 19 +++++++++++ .../sql/hive/client/HiveClientImpl.scala | 14 ++++++++ .../spark/sql/hive/client/VersionsSuite.scala | 5 +++ 8 files changed, 123 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 259008f183b5..cb0332f337de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -189,6 +189,38 @@ abstract class ExternalCatalog { table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] + /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + * + * We provide a default implementation here which simply delegates to the `listPartitions` + * method. For efficiency's sake, overriding this method is recommended for external catalogs + * that can list partition names directly. + * @param db database name + * @param table table name + * @param partialSpec partition spec + */ + def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { + def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { + partColNames.map { name => + ExternalCatalogUtils.escapePathName(name) + "=" + + ExternalCatalogUtils.escapePathName(spec(name)) + }.mkString("/") + } + + val catalogTable = getTable(db, table) + + listPartitions(db, table, partialSpec).map { p => + getPartName(p.spec, catalogTable.partitionColumnNames) + } + } + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index da3a2079f42d..43f51eec6c98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -748,6 +748,23 @@ class SessionCatalog( externalCatalog.getPartition(db, table, spec) } + /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + */ + def listPartitionNames( + tableName: TableIdentifier, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) + externalCatalog.listPartitionNames(db, table, partialSpec) + } + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index dc0720d78d46..32e2f7573739 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -729,13 +729,6 @@ case class ShowPartitionsCommand( AttributeReference("partition", StringType, nullable = false)() :: Nil } - private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { - partColNames.map { name => - ExternalCatalogUtils.escapePathName(name) + "=" + - ExternalCatalogUtils.escapePathName(spec(name)) - }.mkString(File.separator) - } - override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) @@ -772,10 +765,7 @@ case class ShowPartitionsCommand( } } - val partNames = catalog.listPartitions(tableName, spec).map { p => - getPartName(p.spec, table.partitionColumnNames) - } - + val partNames = catalog.listPartitionNames(tableName, spec) partNames.map(Row(_)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index bf9f318780ec..bc290702dc37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -244,13 +244,22 @@ object PartitioningUtils { /** * Given a partition path fragment, e.g. `fieldOne=1/fieldTwo=2`, returns a parsed spec - * for that fragment, e.g. `Map(("fieldOne", "1"), ("fieldTwo", "2"))`. + * for that fragment as a `TablePartitionSpec`, e.g. `Map(("fieldOne", "1"), ("fieldTwo", "2"))`. */ def parsePathFragment(pathFragment: String): TablePartitionSpec = { + parsePathFragmentAsSeq(pathFragment).toMap + } + + /** + * Given a partition path fragment, e.g. `fieldOne=1/fieldTwo=2`, returns a parsed spec + * for that fragment as a `Seq[(String, String)]`, e.g. + * `Seq(("fieldOne", "1"), ("fieldTwo", "2"))`. + */ + def parsePathFragmentAsSeq(pathFragment: String): Seq[(String, String)] = { pathFragment.split("/").map { kv => val pair = kv.split("=", 2) (unescapePathName(pair(0)), unescapePathName(pair(1))) - }.toMap + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index c213e8e0b22e..862f0303eeaa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -927,6 +928,29 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * Returns the partition names from hive metastore for a given table in a database. */ + override def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { + val actualPartColNames = getTable(db, table).partitionColumnNames + val clientPartitionNames = + client.getPartitionNames(db, table, partialSpec.map(lowerCasePartitionSpec)) + + if (actualPartColNames.exists(partColName => partColName != partColName.toLowerCase)) { + clientPartitionNames.map { partName => + val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) + partSpec.map { case (partName, partValue) => + actualPartColNames.find(_.equalsIgnoreCase(partName)).get + "=" + partValue + }.mkString("/") + } + } else { + clientPartitionNames + } + } + + /** + * Returns the partitions from hive metastore for a given table in a database. + */ override def listPartitions( db: String, table: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 4c76932b6175..05e813e3cefd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -156,6 +156,25 @@ private[hive] trait HiveClient { } } + /** + * Returns the partition names for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + final def getPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec]): Seq[String] = { + getPartitionNames(getTable(db, table), partialSpec) + } + + /** + * Returns the partition names for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + def getPartitionNames( + table: CatalogTable, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] + /** Returns the specified partition or None if it does not exist. */ final def getPartitionOption( db: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bd840af5b164..569a79516e6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -519,6 +519,20 @@ private[hive] class HiveClientImpl( client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } + /** + * Returns the partition names for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + override def getPartitionNames( + table: CatalogTable, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState { + partialSpec match { + case None => client.getPartitionNames(table.database, table.identifier.table, -1).asScala + case Some(s) => + client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1).asScala + } + } + override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 16ae345de6d9..9387d85d67a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -254,6 +254,11 @@ class VersionsSuite extends SparkFunSuite with Logging { "default", "src_part", partitions, ignoreIfExists = true) } + test(s"$version: getPartitionNames(catalogTable)") { + assert(testPartitionCount == + client.getPartitionNames(client.getTable("default", "src_part")).size) + } + test(s"$version: getPartitions(catalogTable)") { assert(testPartitionCount == client.getPartitions(client.getTable("default", "src_part")).size) From 54171ada430c0e6ed0ecf0ebbb281f73b3dc7bb7 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Fri, 25 Nov 2016 12:21:57 -0800 Subject: [PATCH 02/12] Make `ExternalCatalog.listPartitionNames` abstract and implement that method in `InMemoryCatalog` --- .../catalyst/catalog/ExternalCatalog.scala | 19 +------------------ .../catalyst/catalog/InMemoryCatalog.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index cb0332f337de..4a5446dad0af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -195,10 +195,6 @@ abstract class ExternalCatalog { * A partial partition spec may optionally be provided to filter the partitions returned. * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), * then a partial spec of (a='1') will return the first two only. - * - * We provide a default implementation here which simply delegates to the `listPartitions` - * method. For efficiency's sake, overriding this method is recommended for external catalogs - * that can list partition names directly. * @param db database name * @param table table name * @param partialSpec partition spec @@ -206,20 +202,7 @@ abstract class ExternalCatalog { def listPartitionNames( db: String, table: String, - partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { - def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { - partColNames.map { name => - ExternalCatalogUtils.escapePathName(name) + "=" + - ExternalCatalogUtils.escapePathName(spec(name)) - }.mkString("/") - } - - val catalogTable = getTable(db, table) - - listPartitions(db, table, partialSpec).map { p => - getPartName(p.spec, catalogTable.partitionColumnNames) - } - } + partialSpec: Option[TablePartitionSpec] = None): Seq[String] /** * List the metadata of all partitions that belong to the specified table, assuming it exists. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 880a7a0dc422..5bbf950b53cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -488,6 +488,19 @@ class InMemoryCatalog( } } + override def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = synchronized { + val partitionColumnNames = getTable(db, table).partitionColumnNames + + listPartitions(db, table, partialSpec).map { partition => + partitionColumnNames.map { name => + name + "=" + partition.spec(name) + }.mkString("/") + } + } + override def listPartitions( db: String, table: String, From 93cee977489fd5a599d091b23750f7164a84806d Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Sat, 26 Nov 2016 13:23:23 -0800 Subject: [PATCH 03/12] Insert a couple of cosmetic newlines --- .../org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 4a5446dad0af..ed03ef1880d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -195,6 +195,7 @@ abstract class ExternalCatalog { * A partial partition spec may optionally be provided to filter the partitions returned. * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), * then a partial spec of (a='1') will return the first two only. + * * @param db database name * @param table table name * @param partialSpec partition spec @@ -210,6 +211,7 @@ abstract class ExternalCatalog { * A partial partition spec may optionally be provided to filter the partitions returned. * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), * then a partial spec of (a='1') will return the first two only. + * * @param db database name * @param table table name * @param partialSpec partition spec From 27dc6720f657c131035d604c1e9de0cb564a05f2 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Sat, 26 Nov 2016 14:43:57 -0800 Subject: [PATCH 04/12] Modify `HiveCommandSuite` to make it test partition columns with uppercase characters in their names --- .../sql/hive/execution/HiveCommandSuite.scala | 103 ++++++++++-------- 1 file changed, 55 insertions(+), 48 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 46ed18c70fb5..940b4482a9b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -53,25 +53,28 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |TBLPROPERTIES('prop1Key'="prop1Val", '`prop2Key`'="prop2Val") """.stripMargin) sql("CREATE TABLE parquet_tab3(col1 int, `col 2` int)") - sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (year int, month int)") - sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 1) SELECT 1, 1") - sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 2) SELECT 2, 2") - sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 2) SELECT 3, 3") - sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 3) SELECT 3, 3") + + // NB: some table partition column names in this test suite have upper-case characters to test + // column name case preservation. Do not lowercase these partition names without good reason. + sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (Year int, Month int)") + sql("INSERT INTO parquet_tab4 PARTITION(Year = 2015, Month = 1) SELECT 1, 1") + sql("INSERT INTO parquet_tab4 PARTITION(Year = 2015, Month = 2) SELECT 2, 2") + sql("INSERT INTO parquet_tab4 PARTITION(Year = 2016, Month = 2) SELECT 3, 3") + sql("INSERT INTO parquet_tab4 PARTITION(Year = 2016, Month = 3) SELECT 3, 3") sql( """ |CREATE TABLE parquet_tab5 (price int, qty int) - |PARTITIONED BY (year int, month int, hour int, minute int, sec int, extra int) + |PARTITIONED BY (Year int, Month int, hour int, minute int, sec int, extra int) """.stripMargin) sql( """ |INSERT INTO parquet_tab5 - |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + |PARTITION(Year = 2016, Month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql( """ |INSERT INTO parquet_tab5 - |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + |PARTITION(Year = 2016, Month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") } @@ -183,7 +186,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql( """ |CREATE TABLE part_table (employeeID INT, employeeName STRING) - |PARTITIONED BY (c STRING, d STRING) + |PARTITIONED BY (C STRING, d STRING) |ROW FORMAT DELIMITED |FIELDS TERMINATED BY '|' |LINES TERMINATED BY '\n' @@ -195,24 +198,24 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1")""") } intercept[AnalysisException] { sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""") } intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", k="2")""") } - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), sql("SELECT * FROM non_part_table").collect()) // Different order of partition columns. - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", C="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '2' AND d = '1'"), sql("SELECT * FROM non_part_table").collect()) } } @@ -296,38 +299,38 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql( """ |CREATE TABLE part_table (employeeID INT, employeeName STRING) - |PARTITIONED BY (c STRING, d STRING) + |PARTITIONED BY (C STRING, d STRING) |ROW FORMAT DELIMITED |FIELDS TERMINATED BY '|' |LINES TERMINATED BY '\n' """.stripMargin) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="1")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="1")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '1'"), testResults) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), testResults) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="2", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="2", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '2' AND d = '2'"), testResults) - sql("TRUNCATE TABLE part_table PARTITION(c='1', d='1')") + sql("TRUNCATE TABLE part_table PARTITION(C='1', d='1')") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '1'"), Seq.empty[Row]) checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), testResults) - sql("TRUNCATE TABLE part_table PARTITION(c='1')") + sql("TRUNCATE TABLE part_table PARTITION(C='1')") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1'"), Seq.empty[Row]) sql("TRUNCATE TABLE part_table") @@ -341,40 +344,40 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("show partitions - show everything") { checkAnswer( sql("show partitions parquet_tab4"), - Row("year=2015/month=1") :: - Row("year=2015/month=2") :: - Row("year=2016/month=2") :: - Row("year=2016/month=3") :: Nil) + Row("Year=2015/Month=1") :: + Row("Year=2015/Month=2") :: + Row("Year=2016/Month=2") :: + Row("Year=2016/Month=3") :: Nil) checkAnswer( sql("show partitions default.parquet_tab4"), - Row("year=2015/month=1") :: - Row("year=2015/month=2") :: - Row("year=2016/month=2") :: - Row("year=2016/month=3") :: Nil) + Row("Year=2015/Month=1") :: + Row("Year=2015/Month=2") :: + Row("Year=2016/Month=2") :: + Row("Year=2016/Month=3") :: Nil) } test("show partitions - show everything more than 5 part keys") { checkAnswer( sql("show partitions parquet_tab5"), - Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") :: - Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) + Row("Year=2016/Month=3/hour=10/minute=10/sec=10/extra=1") :: + Row("Year=2016/Month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) } test("show partitions - filter") { checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(year=2015)"), - Row("year=2015/month=1") :: - Row("year=2015/month=2") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(Year=2015)"), + Row("Year=2015/Month=1") :: + Row("Year=2015/Month=2") :: Nil) checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(year=2015, month=1)"), - Row("year=2015/month=1") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(Year=2015, Month=1)"), + Row("Year=2015/Month=1") :: Nil) checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(month=2)"), - Row("year=2015/month=2") :: - Row("year=2016/month=2") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(Month=2)"), + Row("Year=2015/Month=2") :: + Row("Year=2016/Month=2") :: Nil) } test("show partitions - empty row") { @@ -408,14 +411,18 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("show partitions - datasource") { withTable("part_datasrc") { - val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") + val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("A", "b", "c") df.write - .partitionBy("a") + .partitionBy("A") .format("parquet") .mode(SaveMode.Overwrite) .saveAsTable("part_datasrc") - assert(sql("SHOW PARTITIONS part_datasrc").count() == 3) + checkAnswer( + sql("SHOW PARTITIONS part_datasrc"), + Row("A=1") :: + Row("A=2") :: + Row("A=3") :: Nil) } } } From 2a7b06224e84fa027ee4dabf5e7ecf1118f81e78 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Mon, 28 Nov 2016 09:56:52 -0800 Subject: [PATCH 05/12] Formatting and code commenting --- .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 4 ++-- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index ed03ef1880d3..e0f44b7eadb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -198,7 +198,7 @@ abstract class ExternalCatalog { * * @param db database name * @param table table name - * @param partialSpec partition spec + * @param partialSpec partition spec */ def listPartitionNames( db: String, @@ -214,7 +214,7 @@ abstract class ExternalCatalog { * * @param db database name * @param table table name - * @param partialSpec partition spec + * @param partialSpec partition spec */ def listPartitions( db: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 569a79516e6e..46dbd3cfa239 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -527,7 +527,9 @@ private[hive] class HiveClientImpl( table: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState { partialSpec match { - case None => client.getPartitionNames(table.database, table.identifier.table, -1).asScala + case None => + // "-1" means "do not limit the number of results" + client.getPartitionNames(table.database, table.identifier.table, -1).asScala case Some(s) => client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1).asScala } From d183946b1854323c928e307e999adc99f10722c2 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 29 Nov 2016 19:44:02 -0800 Subject: [PATCH 06/12] Code documentation and partition name/value path name escaping --- .../sql/catalyst/catalog/ExternalCatalog.scala | 11 +++++++---- .../sql/catalyst/catalog/InMemoryCatalog.scala | 3 ++- .../sql/catalyst/catalog/SessionCatalog.scala | 6 ++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 17 +++++++---------- .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index e0f44b7eadb5..2b45d055fa32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -192,9 +192,12 @@ abstract class ExternalCatalog { /** * List the names of all partitions that belong to the specified table, assuming it exists. * - * A partial partition spec may optionally be provided to filter the partitions returned. - * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), - * then a partial spec of (a='1') will return the first two only. + * For a table with partition columns p1, p2, p3, each partition name is formatted as + * `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be + * decoded with the `ExternalCatalogUtils.unescapePathName` method. + * + * A partial partition spec may optionally be provided to filter the partitions returned, as + * described in the `listPartitions` method. * * @param db database name * @param table table name @@ -227,7 +230,7 @@ abstract class ExternalCatalog { * * @param db database name * @param table table name - * @param predicates partition-pruning predicates + * @param predicates partition-pruning predicates */ def listPartitionsByFilter( db: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 5bbf950b53cb..7d63948eeed1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils @@ -496,7 +497,7 @@ class InMemoryCatalog( listPartitions(db, table, partialSpec).map { partition => partitionColumnNames.map { name => - name + "=" + partition.spec(name) + escapePathName(name) + "=" + escapePathName(partition.spec(name)) }.mkString("/") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 43f51eec6c98..7a3d2097a85c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -762,6 +762,9 @@ class SessionCatalog( val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } externalCatalog.listPartitionNames(db, table, partialSpec) } @@ -779,6 +782,9 @@ class SessionCatalog( val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } externalCatalog.listPartitions(db, table, partialSpec) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 862f0303eeaa..b6ed51071547 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -935,16 +936,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val actualPartColNames = getTable(db, table).partitionColumnNames val clientPartitionNames = client.getPartitionNames(db, table, partialSpec.map(lowerCasePartitionSpec)) - - if (actualPartColNames.exists(partColName => partColName != partColName.toLowerCase)) { - clientPartitionNames.map { partName => - val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) - partSpec.map { case (partName, partValue) => - actualPartColNames.find(_.equalsIgnoreCase(partName)).get + "=" + partValue - }.mkString("/") - } - } else { - clientPartitionNames + clientPartitionNames.map { partName => + val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) + partSpec.map { case (partName, partValue) => + escapePathName(actualPartColNames.find(_.equalsIgnoreCase(partName)).get) + "=" + + escapePathName(partValue) + }.mkString("/") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 46dbd3cfa239..fcdce0155662 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -528,7 +528,7 @@ private[hive] class HiveClientImpl( partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState { partialSpec match { case None => - // "-1" means "do not limit the number of results" + // -1 for result limit means "no limit/return all" client.getPartitionNames(table.database, table.identifier.table, -1).asScala case Some(s) => client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1).asScala From 9c71521cf6608b25e728d13ec5ba63c5a5ecc83e Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 1 Dec 2016 10:02:40 -0800 Subject: [PATCH 07/12] Add unit tests for the new `ExternalCatalog.listPartitionNames` method --- .../catalyst/catalog/ExternalCatalog.scala | 2 ++ .../catalyst/catalog/InMemoryCatalog.scala | 2 +- .../catalog/ExternalCatalogSuite.scala | 25 +++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 1 + 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 2b45d055fa32..4b8cac8f32b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -196,6 +196,8 @@ abstract class ExternalCatalog { * `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be * decoded with the `ExternalCatalogUtils.unescapePathName` method. * + * The returned sequence is sorted as strings. + * * A partial partition spec may optionally be provided to filter the partitions returned, as * described in the `listPartitions` method. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 7d63948eeed1..a6bebe1a3938 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -499,7 +499,7 @@ class InMemoryCatalog( partitionColumnNames.map { name => escapePathName(name) + "=" + escapePathName(partition.spec(name)) }.mkString("/") - } + }.sorted } override def listPartitions( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 3b39f420af49..00e663c324cb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -346,6 +346,31 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(new Path(partitionLocation) == defaultPartitionLocation) } + test("list partition names") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames = catalog.listPartitionNames("db2", "tbl2") + assert(partitionNames == Seq("a=1/b=%25%3D", "a=1/b=2", "a=3/b=4")) + } + + test("list partition names with partial partition spec") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames1 = catalog.listPartitionNames("db2", "tbl2", Some(Map("a" -> "1"))) + assert(partitionNames1 == Seq("a=1/b=%25%3D", "a=1/b=2")) + + // Partial partition specs including "weird" partition values should use the unescaped values + val partitionNames2 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%="))) + assert(partitionNames2 == Seq("a=1/b=%25%3D")) + + val partitionNames3 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%25%3D"))) + assert(partitionNames3.isEmpty) + } + test("list partitions with partial partition spec") { val catalog = newBasicCatalog() val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b6ed51071547..748a5b1af448 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -936,6 +936,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val actualPartColNames = getTable(db, table).partitionColumnNames val clientPartitionNames = client.getPartitionNames(db, table, partialSpec.map(lowerCasePartitionSpec)) + // No need to sort the results, because according to the Hive wiki Hive sorts them for us clientPartitionNames.map { partName => val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) partSpec.map { case (partName, partValue) => From b9dd303b9b58b5f4ddeb6e354c64f29ef9f9590c Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 1 Dec 2016 10:03:24 -0800 Subject: [PATCH 08/12] Revert modifications to the HiveCommandSuite --- .../sql/hive/execution/HiveCommandSuite.scala | 103 ++++++++---------- 1 file changed, 48 insertions(+), 55 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 940b4482a9b7..46ed18c70fb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -53,28 +53,25 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |TBLPROPERTIES('prop1Key'="prop1Val", '`prop2Key`'="prop2Val") """.stripMargin) sql("CREATE TABLE parquet_tab3(col1 int, `col 2` int)") - - // NB: some table partition column names in this test suite have upper-case characters to test - // column name case preservation. Do not lowercase these partition names without good reason. - sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (Year int, Month int)") - sql("INSERT INTO parquet_tab4 PARTITION(Year = 2015, Month = 1) SELECT 1, 1") - sql("INSERT INTO parquet_tab4 PARTITION(Year = 2015, Month = 2) SELECT 2, 2") - sql("INSERT INTO parquet_tab4 PARTITION(Year = 2016, Month = 2) SELECT 3, 3") - sql("INSERT INTO parquet_tab4 PARTITION(Year = 2016, Month = 3) SELECT 3, 3") + sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (year int, month int)") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 1) SELECT 1, 1") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2015, month = 2) SELECT 2, 2") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 2) SELECT 3, 3") + sql("INSERT INTO parquet_tab4 PARTITION(year = 2016, month = 3) SELECT 3, 3") sql( """ |CREATE TABLE parquet_tab5 (price int, qty int) - |PARTITIONED BY (Year int, Month int, hour int, minute int, sec int, extra int) + |PARTITIONED BY (year int, month int, hour int, minute int, sec int, extra int) """.stripMargin) sql( """ |INSERT INTO parquet_tab5 - |PARTITION(Year = 2016, Month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + |PARTITION(year = 2016, month = 3, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql( """ |INSERT INTO parquet_tab5 - |PARTITION(Year = 2016, Month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 + |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") } @@ -186,7 +183,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql( """ |CREATE TABLE part_table (employeeID INT, employeeName STRING) - |PARTITIONED BY (C STRING, d STRING) + |PARTITIONED BY (c STRING, d STRING) |ROW FORMAT DELIMITED |FIELDS TERMINATED BY '|' |LINES TERMINATED BY '\n' @@ -198,24 +195,24 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""") } intercept[AnalysisException] { sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""") } intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", k="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""") } - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), sql("SELECT * FROM non_part_table").collect()) // Different order of partition columns. - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", C="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '2' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), sql("SELECT * FROM non_part_table").collect()) } } @@ -299,38 +296,38 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql( """ |CREATE TABLE part_table (employeeID INT, employeeName STRING) - |PARTITIONED BY (C STRING, d STRING) + |PARTITIONED BY (c STRING, d STRING) |ROW FORMAT DELIMITED |FIELDS TERMINATED BY '|' |LINES TERMINATED BY '\n' """.stripMargin) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="1")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="1")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"), testResults) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="1", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), testResults) - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(C="2", d="2")""") + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="2", d="2")""") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '2' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '2'"), testResults) - sql("TRUNCATE TABLE part_table PARTITION(C='1', d='1')") + sql("TRUNCATE TABLE part_table PARTITION(c='1', d='1')") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"), Seq.empty[Row]) checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1' AND d = '2'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), testResults) - sql("TRUNCATE TABLE part_table PARTITION(C='1')") + sql("TRUNCATE TABLE part_table PARTITION(c='1')") checkAnswer( - sql("SELECT employeeID, employeeName FROM part_table WHERE C = '1'"), + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1'"), Seq.empty[Row]) sql("TRUNCATE TABLE part_table") @@ -344,40 +341,40 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("show partitions - show everything") { checkAnswer( sql("show partitions parquet_tab4"), - Row("Year=2015/Month=1") :: - Row("Year=2015/Month=2") :: - Row("Year=2016/Month=2") :: - Row("Year=2016/Month=3") :: Nil) + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) checkAnswer( sql("show partitions default.parquet_tab4"), - Row("Year=2015/Month=1") :: - Row("Year=2015/Month=2") :: - Row("Year=2016/Month=2") :: - Row("Year=2016/Month=3") :: Nil) + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: + Row("year=2016/month=3") :: Nil) } test("show partitions - show everything more than 5 part keys") { checkAnswer( sql("show partitions parquet_tab5"), - Row("Year=2016/Month=3/hour=10/minute=10/sec=10/extra=1") :: - Row("Year=2016/Month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) + Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") :: + Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil) } test("show partitions - filter") { checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(Year=2015)"), - Row("Year=2015/Month=1") :: - Row("Year=2015/Month=2") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(year=2015)"), + Row("year=2015/month=1") :: + Row("year=2015/month=2") :: Nil) checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(Year=2015, Month=1)"), - Row("Year=2015/Month=1") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(year=2015, month=1)"), + Row("year=2015/month=1") :: Nil) checkAnswer( - sql("show partitions default.parquet_tab4 PARTITION(Month=2)"), - Row("Year=2015/Month=2") :: - Row("Year=2016/Month=2") :: Nil) + sql("show partitions default.parquet_tab4 PARTITION(month=2)"), + Row("year=2015/month=2") :: + Row("year=2016/month=2") :: Nil) } test("show partitions - empty row") { @@ -411,18 +408,14 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("show partitions - datasource") { withTable("part_datasrc") { - val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("A", "b", "c") + val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") df.write - .partitionBy("A") + .partitionBy("a") .format("parquet") .mode(SaveMode.Overwrite) .saveAsTable("part_datasrc") - checkAnswer( - sql("SHOW PARTITIONS part_datasrc"), - Row("A=1") :: - Row("A=2") :: - Row("A=3") :: Nil) + assert(sql("SHOW PARTITIONS part_datasrc").count() == 3) } } } From 28563d4d5bd9c25dafbcc17215dff85c435615d8 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 1 Dec 2016 11:50:15 -0800 Subject: [PATCH 09/12] Explicitly sort the results from `HiveClient.getPartitionNames` and add coverage to `VersionsSuite` --- .../spark/sql/hive/HiveExternalCatalog.scala | 6 +++--- .../spark/sql/hive/client/HiveClient.scala | 13 ++----------- .../spark/sql/hive/client/HiveClientImpl.scala | 18 +++++++++++------- .../spark/sql/hive/client/VersionsSuite.scala | 4 ++-- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 748a5b1af448..d3e66baa42ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -933,10 +933,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { - val actualPartColNames = getTable(db, table).partitionColumnNames + val catalogTable = getTable(db, table) + val actualPartColNames = catalogTable.partitionColumnNames val clientPartitionNames = - client.getPartitionNames(db, table, partialSpec.map(lowerCasePartitionSpec)) - // No need to sort the results, because according to the Hive wiki Hive sorts them for us + client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) clientPartitionNames.map { partName => val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) partSpec.map { case (partName, partValue) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 05e813e3cefd..8e7c871183df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -159,17 +159,8 @@ private[hive] trait HiveClient { /** * Returns the partition names for the given table that match the supplied partition spec. * If no partition spec is specified, all partitions are returned. - */ - final def getPartitionNames( - db: String, - table: String, - partialSpec: Option[TablePartitionSpec]): Seq[String] = { - getPartitionNames(getTable(db, table), partialSpec) - } - - /** - * Returns the partition names for the given table that match the supplied partition spec. - * If no partition spec is specified, all partitions are returned. + * + * The returned sequence is sorted as strings. */ def getPartitionNames( table: CatalogTable, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index fcdce0155662..db73596e5f52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -522,17 +522,21 @@ private[hive] class HiveClientImpl( /** * Returns the partition names for the given table that match the supplied partition spec. * If no partition spec is specified, all partitions are returned. + * + * The returned sequence is sorted as strings. */ override def getPartitionNames( table: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState { - partialSpec match { - case None => - // -1 for result limit means "no limit/return all" - client.getPartitionNames(table.database, table.identifier.table, -1).asScala - case Some(s) => - client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1).asScala - } + val hivePartitionNames = + partialSpec match { + case None => + // -1 for result limit means "no limit/return all" + client.getPartitionNames(table.database, table.identifier.table, -1) + case Some(s) => + client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1) + } + hivePartitionNames.asScala.sorted } override def getPartitionOption( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9387d85d67a7..79e76b3134c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -255,8 +255,8 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: getPartitionNames(catalogTable)") { - assert(testPartitionCount == - client.getPartitionNames(client.getTable("default", "src_part")).size) + val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2") + assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) } test(s"$version: getPartitions(catalogTable)") { From 860d985bf0f596ade444d0b5c85e48f1d1c21eab Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Fri, 2 Dec 2016 10:28:45 -0800 Subject: [PATCH 10/12] Build and use a map of lower-cased partition column names to exact partition column names when listing partitions and partition names --- .../spark/sql/hive/HiveExternalCatalog.scala | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index d3e66baa42ba..f67ddc9be1a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -814,9 +814,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat spec.map { case (k, v) => k.toLowerCase -> v } } + // Build a map from lower-cased partition column names to exact column names for a given table + private def buildLowerCasePartColNameMap(table: CatalogTable): Map[String, String] = { + val actualPartColNames = table.partitionColumnNames + actualPartColNames.map(colName => (colName.toLowerCase, colName)).toMap + } + // Hive metastore is not case preserving and the column names of the partition specification we // get from the metastore are always lower cased. We should restore them w.r.t. the actual table // partition columns. + private def restorePartitionSpec( + spec: TablePartitionSpec, + partColMap: Map[String, String]): TablePartitionSpec = { + spec.map { case (k, v) => partColMap(k.toLowerCase) -> v } + } + private def restorePartitionSpec( spec: TablePartitionSpec, partCols: Seq[String]): TablePartitionSpec = { @@ -934,14 +946,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { val catalogTable = getTable(db, table) - val actualPartColNames = catalogTable.partitionColumnNames + val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) val clientPartitionNames = client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) clientPartitionNames.map { partName => val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) partSpec.map { case (partName, partValue) => - escapePathName(actualPartColNames.find(_.equalsIgnoreCase(partName)).get) + "=" + - escapePathName(partValue) + partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue) }.mkString("/") } } @@ -953,9 +964,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { - val actualPartColNames = getTable(db, table).partitionColumnNames + val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, actualPartColNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } } @@ -976,10 +987,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val partitionSchema = catalogTable.partitionSchema + val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) if (predicates.nonEmpty) { val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part => - part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } val boundPredicate = InterpretedPredicate.create(predicates.reduce(And).transform { @@ -990,7 +1002,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } } else { client.getPartitions(catalogTable).map { part => - part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames)) + part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } } } From fc57f234206ac2cf96dcc9b9e4b9eb18154ef7c7 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Fri, 2 Dec 2016 10:29:39 -0800 Subject: [PATCH 11/12] Add tests for `SessionCatalog.listPartitionNames` and enhance coverage of `SessionCatalog.listPartitions` to include test of invalid partial partition specs --- .../catalog/SessionCatalogSuite.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f9c4b2687bf7..5cc772d8e9a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -878,6 +878,31 @@ class SessionCatalogSuite extends SparkFunSuite { "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) } + test("list partition names") { + val catalog = new SessionCatalog(newBasicCatalog()) + val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4") + assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) == + expectedPartitionNames) + // List partition names without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames) + } + + test("list partition names with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert( + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) == + Seq("a=1/b=2")) + } + + test("list partition names with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalogPartitionsEqual( @@ -887,6 +912,20 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) } + test("list partitions with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1)) + } + + test("list partitions with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitions( + TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[NoSuchDatabaseException] { From 37fc5955b2a26caa5ef133d19e415172a26330a2 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Mon, 5 Dec 2016 12:13:23 -0800 Subject: [PATCH 12/12] Fix something in DataSourceStrategy.scala. Patch provided by @gatorsmile --- .../datasources/DataSourceStrategy.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4468dc58e404..03eed251763b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -161,8 +161,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query)) - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false) + case logical.InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false) if query.resolved && t.schema.sameType(query.schema) => // Sanity checks @@ -192,11 +192,19 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { + overwrite.staticPartitionKeys.map { case (k, v) => + (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) + } + } else { + Map.empty + } + // When partitions are tracked by the catalog, compute all custom partition locations that // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( - l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys)) + l.catalogTable.get.identifier, Some(staticPartitionKeys)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions) @@ -225,14 +233,6 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { t.location.refresh() } - val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { - overwrite.staticPartitionKeys.map { case (k, v) => - (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) - } - } else { - Map.empty - } - val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, staticPartitionKeys,