From b1ea87ed2757dfa5fa48e772575a20959050ef8f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 13:47:45 -0700 Subject: [PATCH 01/14] Throw exception for MSCK REPAIR TABLE --- .../org/apache/spark/sql/catalyst/parser/SqlBase.g4 | 6 ++++-- .../apache/spark/sql/hive/HiveDDLCommandSuite.scala | 12 ++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 8a45b4f2e1c2a..e9bee9b3ec480 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -148,7 +148,7 @@ hiveNativeCommands | ROLLBACK WORK? | SHOW PARTITIONS tableIdentifier partitionSpec? | DFS .*? - | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | MSCK | LOAD) .*? + | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*? ; unsupportedHiveNativeCommands @@ -177,6 +177,7 @@ unsupportedHiveNativeCommands | kw1=UNLOCK kw2=DATABASE | kw1=CREATE kw2=TEMPORARY kw3=MACRO | kw1=DROP kw2=TEMPORARY kw3=MACRO + | kw1=MSCK kw2=REPAIR kw3=TABLE ; createTableHeader @@ -650,7 +651,7 @@ nonReserved | AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER - | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE + | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION ; @@ -866,6 +867,7 @@ GRANT: 'GRANT'; LOCK: 'LOCK'; UNLOCK: 'UNLOCK'; MSCK: 'MSCK'; +REPAIR: 'REPAIR'; EXPORT: 'EXPORT'; IMPORT: 'IMPORT'; LOAD: 'LOAD'; diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index c5f01da4fabbb..9c774eae9f089 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -41,6 +41,13 @@ class HiveDDLCommandSuite extends PlanTest { }.head } + private def assertUnsupported(sql: String): Unit = { + val e = intercept[ParseException] { + parser.parsePlan(sql) + } + assert(e.getMessage.toLowerCase.contains("unsupported")) + } + test("Test CTAS #1") { val s1 = """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view @@ -305,4 +312,9 @@ class HiveDDLCommandSuite extends PlanTest { parser.parsePlan(v1).isInstanceOf[HiveNativeCommand] } } + + test("MSCK repair table is not supported") { + assertUnsupported("MSCK REPAIR TABLE tab1") + } + } From 1690596460f93377c8192763666a7767b2a17bda Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 14:07:48 -0700 Subject: [PATCH 02/14] Throw exception on (un)archive partition --- .../spark/sql/execution/SparkSqlParser.scala | 12 +++------ .../spark/sql/execution/command/ddl.scala | 10 ------- .../execution/command/DDLCommandSuite.scala | 27 +++++++++---------- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3de8aa02766dc..95dea9e56d78b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -567,10 +567,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitArchiveTablePartition( ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableArchivePartition( - visitTableIdentifier(ctx.tableIdentifier), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... ARCHIVE PARTITION ...") } /** @@ -583,10 +581,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitUnarchiveTablePartition( ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableUnarchivePartition( - visitTableIdentifier(ctx.tableIdentifier), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... UNARCHIVE PARTITION ...") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6d56a6fec86e9..57952ed8f34bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -340,16 +340,6 @@ case class AlterTableDropPartition( purge: Boolean)(sql: String) extends NativeDDLCommand(sql) with Logging -case class AlterTableArchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableUnarchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableSetFileFormat( tableName: TableIdentifier, partitionSpec: Option[TablePartitionSpec], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 8e63b69876501..feb294c44cd31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest @@ -25,9 +26,17 @@ import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ +// TODO: merge this with DDLSuite (SPARK-14441) class DDLCommandSuite extends PlanTest { private val parser = SparkSqlParser + private def assertUnsupported(sql: String): Unit = { + val e = intercept[AnalysisException] { + parser.parsePlan(sql) + } + assert(e.getMessage.toLowerCase.contains("operation not allowed")) + } + test("create database") { val sql = """ @@ -435,22 +444,12 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2_table, expected2_table) } - test("alter table: archive partition") { - val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')" - val parsed = parser.parsePlan(sql) - val expected = AlterTableArchivePartition( - TableIdentifier("table_name", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + test("alter table: archive partition (not supported)") { + assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')") } - test("alter table: unarchive partition") { - val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')" - val parsed = parser.parsePlan(sql) - val expected = AlterTableUnarchivePartition( - TableIdentifier("table_name", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + test("alter table: unarchive partition (not supported)") { + assertUnsupported("ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')") } test("alter table: set file format") { From e24e007a1a8e6dfc03c7477788af7fbc6bdd6a11 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 14:45:54 -0700 Subject: [PATCH 03/14] Implement add partitions --- .../spark/sql/execution/command/ddl.scala | 28 +++++++- .../sql/execution/command/DDLSuite.scala | 66 +++++++++++++++++-- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 57952ed8f34bb..6c2272d840f63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types._ @@ -306,12 +307,35 @@ case class AlterTableSerDeProperties( * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, * EXCEPT that it is ILLEGAL to specify a LOCATION clause. * An error message will be issued if the partition exists, unless 'ifNotExists' is true. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * ALTER VIEW view ADD [IF NOT EXISTS] PARTITION spec + * }}} */ case class AlterTableAddPartition( tableName: TableIdentifier, partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], ifNotExists: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTable(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table add partition is not allowed for tables defined using the datasource API") + } + val parts = partitionSpecsAndLocs.map { case (spec, location) => + // inherit table storage format (possibly except for location) + CatalogTablePartition(spec, table.storage.copy(locationUri = location)) + } + catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists) + Seq.empty[Row] + } + +} case class AlterTableRenamePartition( tableName: TableIdentifier, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a8db4e9923937..1ef3943d73560 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -58,6 +58,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.getMessage.toLowerCase.contains("operation not allowed")) } + private def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = { + if (expectException) intercept[AnalysisException] { body } else body + } + private def createDatabase(catalog: SessionCatalog, name: String): Unit = { catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false) } @@ -319,6 +323,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES") } + test("alter table: add partition") { + testAddPartitions(isDatasourceTable = false) + } + + test("alter table: add partition (datasource table)") { + testAddPartitions(isDatasourceTable = true) + } + // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext test("show tables") { @@ -428,10 +440,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(storageFormat.locationUri === Some(expected)) } } - // Optionally expect AnalysisException - def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = { - if (expectException) intercept[AnalysisException] { body } else body - } // set table location sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'") verifyLocation("/path/to/your/lovely/heart") @@ -505,4 +513,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def testAddPartitions(isDatasourceTable: Boolean): Unit = { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + val part4 = Map("d" -> "4") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + + "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) + assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty) + assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris")) + assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty) + } + // add partitions without explicitly specifying database + catalog.setCurrentDatabase("dbx") + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + } + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')") + } + // partition to add already exists + intercept[AnalysisException] { + sql("ALTER TABLE tab1 ADD PARTITION (d='4')") + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + } + } + } From 18de2b39dd25bfd0caf42b83326523e7c46d66b2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 14:48:08 -0700 Subject: [PATCH 04/14] Throw exception for exchange partition --- .../spark/sql/execution/SparkSqlParser.scala | 10 +++------- .../apache/spark/sql/execution/command/ddl.scala | 8 +------- .../sql/execution/command/DDLCommandSuite.scala | 16 +++++----------- .../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 95dea9e56d78b..771f2320a84fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -496,8 +496,7 @@ class SparkSqlAstBuilder extends AstBuilder { AlterTableAddPartition( visitTableIdentifier(ctx.tableIdentifier), specsAndLocs, - ctx.EXISTS != null)( - command(ctx)) + ctx.EXISTS != null) } /** @@ -510,11 +509,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitExchangeTablePartition( ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableExchangePartition( - visitTableIdentifier(ctx.from), - visitTableIdentifier(ctx.to), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... EXCHANGE PARTITION ...") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6c2272d840f63..6deb8052c741f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -317,7 +317,7 @@ case class AlterTableSerDeProperties( case class AlterTableAddPartition( tableName: TableIdentifier, partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], - ifNotExists: Boolean)(sql: String) + ifNotExists: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -343,12 +343,6 @@ case class AlterTableRenamePartition( newPartition: TablePartitionSpec)(sql: String) extends NativeDDLCommand(sql) with Logging -case class AlterTableExchangePartition( - fromTableName: TableIdentifier, - toTableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging - /** * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view. * This removes the data and metadata for this partition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index feb294c44cd31..0858d87130205 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -336,11 +336,11 @@ class DDLCommandSuite extends PlanTest { Seq( (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), (Map("dt" -> "2009-09-09", "country" -> "uk"), None)), - ifNotExists = true)(sql1) + ifNotExists = true) val expected2 = AlterTableAddPartition( TableIdentifier("table_name", None), Seq((Map("dt" -> "2008-08-08"), Some("loc"))), - ifNotExists = false)(sql2) + ifNotExists = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) @@ -383,18 +383,12 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed, expected) } - test("alter table: exchange partition") { - val sql = + test("alter table: exchange partition (not supported)") { + assertUnsupported( """ |ALTER TABLE table_name_1 EXCHANGE PARTITION |(dt='2008-08-08', country='us') WITH TABLE table_name_2 - """.stripMargin - val parsed = parser.parsePlan(sql) - val expected = AlterTableExchangePartition( - TableIdentifier("table_name_1", None), - TableIdentifier("table_name_2", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + """.stripMargin) } // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 9c774eae9f089..5993d155eab27 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -313,7 +313,7 @@ class HiveDDLCommandSuite extends PlanTest { } } - test("MSCK repair table is not supported") { + test("MSCK repair table (not supported)") { assertUnsupported("MSCK REPAIR TABLE tab1") } From 70586761f48fc98ea6c8131445c0606e3e3b129d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 15:13:08 -0700 Subject: [PATCH 05/14] Implement drop partitions --- .../catalyst/catalog/InMemoryCatalog.scala | 3 +- .../sql/catalyst/catalog/SessionCatalog.scala | 5 +- .../sql/catalyst/catalog/interface.scala | 3 +- .../catalyst/catalog/CatalogTestCases.scala | 18 ++++-- .../catalog/SessionCatalogSuite.scala | 35 +++++++++--- .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 23 +++++++- .../execution/command/DDLCommandSuite.scala | 4 +- .../sql/execution/command/DDLSuite.scala | 56 +++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 5 +- .../spark/sql/hive/client/HiveClient.scala | 3 +- .../sql/hive/client/HiveClientImpl.scala | 5 +- 12 files changed, 135 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 5d136b663f30c..f890720dd5f95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -232,7 +232,8 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, partSpecs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = synchronized { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2acf584e8ff01..f5e054f9fd745 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -362,10 +362,11 @@ class SessionCatalog( def dropPartitions( tableName: TableIdentifier, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) val table = formatTableName(tableName.table) - externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) + externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists, purge) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 97b9946140c5a..1b8fe7ec03a71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -111,7 +111,8 @@ abstract class ExternalCatalog { db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit + ignoreIfNotExists: Boolean, + purge: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index fbcac09ce223f..dd713b0f086ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -272,31 +272,37 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) - catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = true) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) - catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + catalog2.dropPartitions( + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = true) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } test("drop partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) + catalog.dropPartitions( + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = true) } intercept[AnalysisException] { - catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = true) } } test("drop partitions that do not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = true) } - catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = true) } test("get partition") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4d56d001b3e7d..c530d43989aa5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -497,19 +497,28 @@ class SessionCatalogSuite extends SparkFunSuite { val sessionCatalog = new SessionCatalog(externalCatalog) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part1.spec), + ignoreIfNotExists = false, + purge = true) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") sessionCatalog.dropPartitions( - TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2"), + Seq(part2.spec), + ignoreIfNotExists = false, + purge = true) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part1.spec, part2.spec), + ignoreIfNotExists = false, + purge = true) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -517,11 +526,17 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false) + TableIdentifier("tbl1", Some("does_not_exist")), + Seq(), + ignoreIfNotExists = false, + purge = true) } intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false) + TableIdentifier("does_not_exist", Some("db2")), + Seq(), + ignoreIfNotExists = false, + purge = true) } } @@ -529,10 +544,16 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = false, + purge = true) } catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true) + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = true, + purge = true) } test("get partition") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 771f2320a84fc..7b2e4c2af21a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -549,8 +549,7 @@ class SparkSqlAstBuilder extends AstBuilder { visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), ctx.EXISTS != null, - ctx.PURGE != null)( - command(ctx)) + ctx.PURGE != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6deb8052c741f..a6a6e44a30f54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -350,13 +350,32 @@ case class AlterTableRenamePartition( * unless 'purge' is true, but the metadata is completely lost. * An error message will be issued if the partition does not exist, unless 'ifExists' is true. * Note: purge is always false when the target is a view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]; + * }}} */ case class AlterTableDropPartition( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], ifExists: Boolean, - purge: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + purge: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTable(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table drop partition is not allowed for tables defined using the datasource API") + } + catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge) + Seq.empty[Row] + } + +} case class AlterTableSetFileFormat( tableName: TableIdentifier, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 0858d87130205..9956ef711eff7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -425,14 +425,14 @@ class DDLCommandSuite extends PlanTest { Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), ifExists = true, - purge = false)(sql1_table) + purge = false) val expected2_table = AlterTableDropPartition( tableIdent, Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), ifExists = false, - purge = true)(sql2_table) + purge = true) comparePlans(parsed1_table, expected1_table) comparePlans(parsed2_table, expected2_table) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 1ef3943d73560..9bfef9d788dc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -331,6 +331,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testAddPartitions(isDatasourceTable = true) } + test("alter table: drop partition") { + testDropPartitions(isDatasourceTable = false) + } + + test("alter table: drop partition (datasource table)") { + testDropPartitions(isDatasourceTable = true) + } + // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext test("show tables") { @@ -563,4 +571,52 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def testDropPartitions(isDatasourceTable: Boolean): Unit = { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + val part4 = Map("d" -> "4") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + createTablePartition(catalog, part2, tableIdent) + createTablePartition(catalog, part3, tableIdent) + createTablePartition(catalog, part4, tableIdent) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) + } + // drop partitions without explicitly specifying database + catalog.setCurrentDatabase("dbx") + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + } + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')") + } + // partition to drop does not exist + intercept[AnalysisException] { + sql("ALTER TABLE tab1 DROP PARTITION (x='300')") + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + } + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 98a5998d03dd1..7ba71b438a5a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -213,7 +213,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = withClient { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = withClient { requireTableExists(db, table) // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we // need to implement it here ourselves. This is currently somewhat expensive because @@ -233,7 +234,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat parts } if (partsToDrop.nonEmpty) { - client.dropPartitions(db, table, partsToDrop) + client.dropPartitions(db, table, partsToDrop, purge) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ee56f9d75da80..99b212f34c834 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -129,7 +129,8 @@ private[hive] trait HiveClient { def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit + specs: Seq[ExternalCatalog.TablePartitionSpec], + purge: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1f66fbfd85ffa..366bd1ed9b9fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -367,9 +367,10 @@ private[hive] class HiveClientImpl( override def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState { + specs: Seq[ExternalCatalog.TablePartitionSpec], + purge: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call - specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } + specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, purge) } } override def renamePartitions( From cccc66914aa9a379183a2fca7181dbae03907d0d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 15:14:31 -0700 Subject: [PATCH 06/14] Throw exception for touch/compact/concatenate --- .../spark/sql/execution/SparkSqlParser.scala | 16 +----- .../spark/sql/execution/command/ddl.scala | 16 ------ .../execution/command/DDLCommandSuite.scala | 56 ++++--------------- 3 files changed, 15 insertions(+), 73 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 7b2e4c2af21a5..96caefb3b65a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -636,10 +636,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableTouch( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... TOUCH ...") } /** @@ -651,11 +648,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableCompact( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), - string(ctx.STRING))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... COMPACT ...") } /** @@ -667,10 +660,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableMerge( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... CONCATENATE") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a6a6e44a30f54..6e460fa49fff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -434,22 +434,6 @@ case class AlterTableSetLocation( } -case class AlterTableTouch( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableCompact( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec], - compactType: String)(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableMerge( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableChangeCol( tableName: TableIdentifier, partitionSpec: Option[TablePartitionSpec], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 9956ef711eff7..1e6d81dc95c28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -499,55 +499,23 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) } - test("alter table: touch") { - val sql1 = "ALTER TABLE table_name TOUCH" - val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')" - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableTouch( - tableIdent, - None)(sql1) - val expected2 = AlterTableTouch( - tableIdent, - Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + test("alter table: touch (not supported)") { + assertUnsupported("ALTER TABLE table_name TOUCH") + assertUnsupported("ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')") } - test("alter table: compact") { - val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'" - val sql2 = + test("alter table: compact (not supported)") { + assertUnsupported("ALTER TABLE table_name COMPACT 'compaction_type'") + assertUnsupported( """ - |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') - |COMPACT 'MAJOR' - """.stripMargin - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableCompact( - tableIdent, - None, - "compaction_type")(sql1) - val expected2 = AlterTableCompact( - tableIdent, - Some(Map("dt" -> "2008-08-08", "country" -> "us")), - "MAJOR")(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |COMPACT 'MAJOR' + """.stripMargin) } - test("alter table: concatenate") { - val sql1 = "ALTER TABLE table_name CONCATENATE" - val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE" - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableMerge(tableIdent, None)(sql1) - val expected2 = AlterTableMerge( - tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + test("alter table: concatenate (not supported)") { + assertUnsupported("ALTER TABLE table_name CONCATENATE") + assertUnsupported("ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE") } test("alter table: change column name/type/position/comment") { From cdbf251d06a19c3656b7d93f390418c685dd85c2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 15:20:15 -0700 Subject: [PATCH 07/14] Implement rename partition --- .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 20 ++++++++++-- .../execution/command/DDLCommandSuite.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 32 +++++++++++++++++++ 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 96caefb3b65a4..dfaadab99084d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -526,8 +526,7 @@ class SparkSqlAstBuilder extends AstBuilder { AlterTableRenamePartition( visitTableIdentifier(ctx.tableIdentifier), visitNonOptionalPartitionSpec(ctx.from), - visitNonOptionalPartitionSpec(ctx.to))( - command(ctx)) + visitNonOptionalPartitionSpec(ctx.to)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6e460fa49fff2..d555c80c2bcc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -337,11 +337,27 @@ case class AlterTableAddPartition( } +/** + * Alter a table partition's spec. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ case class AlterTableRenamePartition( tableName: TableIdentifier, oldPartition: TablePartitionSpec, - newPartition: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + newPartition: TablePartitionSpec) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.renamePartitions( + tableName, Seq(oldPartition), Seq(newPartition)) + Seq.empty[Row] + } + +} /** * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 1e6d81dc95c28..ec43b5d6d61a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -379,7 +379,7 @@ class DDLCommandSuite extends PlanTest { val expected = AlterTableRenamePartition( TableIdentifier("table_name", None), Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2008-09-09", "country" -> "uk"))(sql) + Map("dt" -> "2008-09-09", "country" -> "uk")) comparePlans(parsed, expected) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 9bfef9d788dc4..561b6fb3f6080 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -339,6 +339,38 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testDropPartitions(isDatasourceTable = true) } + test("alter table: rename partition") { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + createTablePartition(catalog, part2, tableIdent) + createTablePartition(catalog, part3, tableIdent) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3)) + sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')") + sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> "100"), Map("b" -> "200"), part3)) + // rename without explicitly specifying database + catalog.setCurrentDatabase("dbx") + sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> "10"), Map("b" -> "200"), part3)) + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')") + } + // partition to rename does not exist + intercept[AnalysisException] { + sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')") + } + } + // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext test("show tables") { From 74f3a08d739a0842818f05915208b81f91aeb278 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Apr 2016 15:34:35 -0700 Subject: [PATCH 08/14] Fix style --- .../apache/spark/sql/execution/command/DDLCommandSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index ec43b5d6d61a0..7f8e343026b97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -515,7 +515,8 @@ class DDLCommandSuite extends PlanTest { test("alter table: concatenate (not supported)") { assertUnsupported("ALTER TABLE table_name CONCATENATE") - assertUnsupported("ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE") + assertUnsupported( + "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE") } test("alter table: change column name/type/position/comment") { From 85c8b8f8fda68bcfd1f551e952040d281a5ca38c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 14:33:26 -0700 Subject: [PATCH 09/14] Fix tests It seems that Hive supports dropping partitions based on partial specs, where not all partitioned columns are accompanied with specified values in the spec. Ironically there's no API to achieve this using the Hive client, so we need to implement it ourselves in Spark (see HiveClientImpl.scala). Additionally two tests in HiveCompatibilitySuite use features that we explicitly do not allow, so those tests are now added to the blacklist. --- .../sql/hive/execution/HiveCompatibilitySuite.scala | 10 +++++++--- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 10 +++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 9e3cb18d457ce..f0eeda09dba5a 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -376,7 +376,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Create partitioned view is not supported "create_like_view", - "describe_formatted_view_partitioned" + "describe_formatted_view_partitioned", + + // This uses CONCATENATE, which we don't support + "alter_merge_2", + + // TOUCH is not supported + "touch" ) /** @@ -392,7 +398,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "alter2", "alter3", "alter5", - "alter_merge_2", "alter_partition_format_loc", "alter_partition_with_whitelist", "alter_rename_partition", @@ -897,7 +902,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "timestamp_comparison", "timestamp_lazy", "timestamp_null", - "touch", "transform_ppr1", "transform_ppr2", "truncate_table", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 366bd1ed9b9fd..7093ec4700bf3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -370,7 +370,15 @@ private[hive] class HiveClientImpl( specs: Seq[ExternalCatalog.TablePartitionSpec], purge: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call - specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, purge) } + val hiveTable = client.getTable(db, table, true /* throw exception */) + specs.foreach { s => + // The provided spec here can be a partial spec, i.e. it will match all partitions + // whose specs are supersets of this partial spec. E.g. If a table has partitions + // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. + client.getPartitions(hiveTable, s.asJava).asScala.foreach { hivePartition => + client.dropPartition(db, table, hivePartition.getValues, purge) + } + } } override def renamePartitions( From 2a37c75fe19fd985ec386455570d69b922e17ac8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 16:40:45 -0700 Subject: [PATCH 10/14] Fix tests --- .../org/apache/spark/sql/hive/client/HiveClient.scala | 2 +- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 99b212f34c834..beb4c5da5e7fb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -120,7 +120,7 @@ private[hive] trait HiveClient { ignoreIfExists: Boolean): Unit /** - * Drop one or many partitions in the given table. + * Drop one or many partitions in the given table, assuming they exist. * * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the * partitions do not already exist. The seemingly relevant flag `ifExists` in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7093ec4700bf3..dbbf77d040c25 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -375,7 +375,12 @@ private[hive] class HiveClientImpl( // The provided spec here can be a partial spec, i.e. it will match all partitions // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. - client.getPartitions(hiveTable, s.asJava).asScala.foreach { hivePartition => + val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala + if (matchingParts.isEmpty) { + throw new AnalysisException( + s"partition to drop '$s' does not exist in table '$table' database '$db'") + } + matchingParts.foreach { hivePartition => client.dropPartition(db, table, hivePartition.getValues, purge) } } From 73597b2d1a6506ebc23d8486af4da1376e2206f4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 17:00:27 -0700 Subject: [PATCH 11/14] Fix compile after rebase --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index bb7fd40b0b50e..a7afdd12303bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -323,7 +323,7 @@ case class AlterTableAddPartition( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( "alter table add partition is not allowed for tables defined using the datasource API") @@ -383,7 +383,7 @@ case class AlterTableDropPartition( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( "alter table drop partition is not allowed for tables defined using the datasource API") From 220141dc748d8278513a7fece270f26be3f33717 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 17:32:03 -0700 Subject: [PATCH 12/14] Correct drop partitions purge behavior and also push ignoreIfNotExists into HiveClient. --- .../spark/sql/hive/HiveExternalCatalog.scala | 21 +------------------ .../spark/sql/hive/client/HiveClient.scala | 5 +---- .../sql/hive/client/HiveClientImpl.scala | 10 ++++++--- 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f2370c0ae18a6..4048a5e36d1a7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -216,26 +216,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat ignoreIfNotExists: Boolean, purge: Boolean): Unit = withClient { requireTableExists(db, table) - // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we - // need to implement it here ourselves. This is currently somewhat expensive because - // we make multiple synchronous calls to Hive for each partition we want to drop. - val partsToDrop = - if (ignoreIfNotExists) { - parts.filter { spec => - try { - getPartition(db, table, spec) - true - } catch { - // Filter out the partitions that do not actually exist - case _: AnalysisException => false - } - } - } else { - parts - } - if (partsToDrop.nonEmpty) { - client.dropPartitions(db, table, partsToDrop, purge) - } + client.dropPartitions(db, table, parts, ignoreIfNotExists, purge) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 8a61f3dc9bbb6..e06debb3b1e4b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -121,15 +121,12 @@ private[hive] trait HiveClient { /** * Drop one or many partitions in the given table, assuming they exist. - * - * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the - * partitions do not already exist. The seemingly relevant flag `ifExists` in - * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere. */ def dropPartitions( db: String, table: String, specs: Seq[ExternalCatalog.TablePartitionSpec], + ignoreIfNotExists: Boolean, purge: Boolean): Unit /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f0f4eb30f1f17..276cad1f76e24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.cli.CliSessionState import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} @@ -368,6 +368,7 @@ private[hive] class HiveClientImpl( db: String, table: String, specs: Seq[ExternalCatalog.TablePartitionSpec], + ignoreIfNotExists: Boolean, purge: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) @@ -376,12 +377,15 @@ private[hive] class HiveClientImpl( // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala - if (matchingParts.isEmpty) { + if (matchingParts.isEmpty && !ignoreIfNotExists) { throw new AnalysisException( s"partition to drop '$s' does not exist in table '$table' database '$db'") } matchingParts.foreach { hivePartition => - client.dropPartition(db, table, hivePartition.getValues, purge) + val dropOptions = new PartitionDropOptions + dropOptions.ifExists = ignoreIfNotExists + dropOptions.purgeData = purge + client.dropPartition(db, table, hivePartition.getValues, dropOptions) } } } From 3fb9a87cc786b23c5bad6a651e4163afbae0a045 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 8 Apr 2016 13:44:50 -0700 Subject: [PATCH 13/14] Views may not be partitioned A previous patch already throws an exception for us when this happens. I changed the exception to be AnalysisException so we can write the test easier. Eventually all of these exceptions will be made consistent. This is mainly just a documentation + test change. --- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 8 ++++++-- .../org/apache/spark/sql/execution/command/ddl.scala | 8 ++++---- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 8 ++++++++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index dfaadab99084d..9474e9488539a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -480,7 +480,9 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitAddTablePartition( ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) { - if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx) + if (ctx.VIEW != null) { + throw new AnalysisException(s"Operation not allowed: partitioned views") + } // Create partition spec to location mapping. val specsAndLocs = if (ctx.partitionSpec.isEmpty) { ctx.partitionSpecLocation.asScala.map { @@ -543,7 +545,9 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitDropTablePartitions( ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) { - if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx) + if (ctx.VIEW != null) { + throw new AnalysisException(s"Operation not allowed: partitioned views") + } AlterTableDropPartition( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a7afdd12303bd..5685514917989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -304,7 +304,8 @@ case class AlterTableSerDeProperties( } /** - * Add Partition in ALTER TABLE/VIEW: add the table/view partitions. + * Add Partition in ALTER TABLE: add the table partitions. + * * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, * EXCEPT that it is ILLEGAL to specify a LOCATION clause. * An error message will be issued if the partition exists, unless 'ifNotExists' is true. @@ -312,7 +313,6 @@ case class AlterTableSerDeProperties( * The syntax of this command is: * {{{ * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] - * ALTER VIEW view ADD [IF NOT EXISTS] PARTITION spec * }}} */ case class AlterTableAddPartition( @@ -361,7 +361,8 @@ case class AlterTableRenamePartition( } /** - * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view. + * Drop Partition in ALTER TABLE: to drop a particular partition for a table. + * * This removes the data and metadata for this partition. * The data is actually moved to the .Trash/Current directory if Trash is configured, * unless 'purge' is true, but the metadata is completely lost. @@ -371,7 +372,6 @@ case class AlterTableRenamePartition( * The syntax of this command is: * {{{ * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; - * ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]; * }}} */ case class AlterTableDropPartition( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5c2b56f5718cc..5795ba27592df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -332,6 +332,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testAddPartitions(isDatasourceTable = true) } + test("alter table: add partition is not supported for views") { + assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')") + } + test("alter table: drop partition") { testDropPartitions(isDatasourceTable = false) } @@ -340,6 +344,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testDropPartitions(isDatasourceTable = true) } + test("alter table: drop partition is not supported for views") { + assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')") + } + test("alter table: rename partition") { val catalog = sqlContext.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) From e3f08b35efa502f34aabcbba266350e4bfefb0c9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 11 Apr 2016 18:23:41 -0700 Subject: [PATCH 14/14] Don't purge --- .../catalyst/catalog/InMemoryCatalog.scala | 3 +-- .../sql/catalyst/catalog/SessionCatalog.scala | 5 ++--- .../sql/catalyst/catalog/interface.scala | 3 +-- .../catalyst/catalog/CatalogTestCases.scala | 12 +++++------ .../catalog/SessionCatalogSuite.scala | 21 +++++++------------ .../spark/sql/execution/SparkSqlParser.scala | 6 ++++-- .../spark/sql/execution/command/ddl.scala | 5 ++--- .../execution/command/DDLCommandSuite.scala | 16 +++++--------- .../spark/sql/hive/HiveExternalCatalog.scala | 5 ++--- .../spark/sql/hive/client/HiveClient.scala | 3 +-- .../sql/hive/client/HiveClientImpl.scala | 4 +--- 11 files changed, 32 insertions(+), 51 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index d453e03fad816..f8a6fb74cc87d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -231,8 +231,7 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, partSpecs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit = synchronized { + ignoreIfNotExists: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 41956b6f1e86e..34e1cb7315a9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -380,11 +380,10 @@ class SessionCatalog( def dropPartitions( tableName: TableIdentifier, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit = { + ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) val table = formatTableName(tableName.table) - externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists, purge) + externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c71f9e6a05692..4ef59316ceb27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -113,8 +113,7 @@ abstract class ExternalCatalog { db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit + ignoreIfNotExists: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index f5e66ec1fa72a..0d9b0851fa7be 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -282,13 +282,13 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = true) + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = true) + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } @@ -296,11 +296,11 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = true) + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) } intercept[AnalysisException] { catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = true) + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false) } } @@ -308,10 +308,10 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = true) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) } catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = true) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) } test("get partition") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 3d3044b548a36..426273e1e3e6c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -498,16 +498,14 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") sessionCatalog.dropPartitions( TableIdentifier("tbl2"), Seq(part2.spec), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( @@ -516,8 +514,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -527,15 +524,13 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl1", Some("does_not_exist")), Seq(), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) } intercept[AnalysisException] { catalog.dropPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) } } @@ -545,14 +540,12 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), - ignoreIfNotExists = false, - purge = true) + ignoreIfNotExists = false) } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), - ignoreIfNotExists = true, - purge = true) + ignoreIfNotExists = true) } test("get partition") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6637ad79a22c7..73d9640c35f49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -561,11 +561,13 @@ class SparkSqlAstBuilder extends AstBuilder { if (ctx.VIEW != null) { throw new AnalysisException(s"Operation not allowed: partitioned views") } + if (ctx.PURGE != null) { + throw new AnalysisException(s"Operation not allowed: PURGE") + } AlterTableDropPartition( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), - ctx.EXISTS != null, - ctx.PURGE != null) + ctx.EXISTS != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 9879c592dfcd1..c55b1a690e85b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -422,8 +422,7 @@ case class AlterTableRenamePartition( case class AlterTableDropPartition( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - ifExists: Boolean, - purge: Boolean) + ifExists: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -433,7 +432,7 @@ case class AlterTableDropPartition( throw new AnalysisException( "alter table drop partition is not allowed for tables defined using the datasource API") } - catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge) + catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists) Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index d1116ae1cc708..1c8dd6828673e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -408,7 +408,10 @@ class DDLCommandSuite extends PlanTest { val sql2_view = sql2_table.replace("TABLE", "VIEW").replace("PURGE", "") val parsed1_table = parser.parsePlan(sql1_table) - val parsed2_table = parser.parsePlan(sql2_table) + val e = intercept[ParseException] { + parser.parsePlan(sql2_table) + } + assert(e.getMessage.contains("Operation not allowed")) intercept[ParseException] { parser.parsePlan(sql1_view) @@ -423,18 +426,9 @@ class DDLCommandSuite extends PlanTest { Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = true, - purge = false) - val expected2_table = AlterTableDropPartition( - tableIdent, - Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = false, - purge = true) + ifExists = true) comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) } test("alter table: archive partition (not supported)") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index c5863e7475bb1..482f47428d33a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -217,10 +217,9 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withClient { + ignoreIfNotExists: Boolean): Unit = withClient { requireTableExists(db, table) - client.dropPartitions(db, table, parts, ignoreIfNotExists, purge) + client.dropPartitions(db, table, parts, ignoreIfNotExists) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index e06debb3b1e4b..6f7e7bf45106f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -126,8 +126,7 @@ private[hive] trait HiveClient { db: String, table: String, specs: Seq[ExternalCatalog.TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit + ignoreIfNotExists: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 16ab095787d5b..39e26acd7fe9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -368,8 +368,7 @@ private[hive] class HiveClientImpl( db: String, table: String, specs: Seq[ExternalCatalog.TablePartitionSpec], - ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withHiveState { + ignoreIfNotExists: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) specs.foreach { s => @@ -384,7 +383,6 @@ private[hive] class HiveClientImpl( matchingParts.foreach { hivePartition => val dropOptions = new PartitionDropOptions dropOptions.ifExists = ignoreIfNotExists - dropOptions.purgeData = purge client.dropPartition(db, table, hivePartition.getValues, dropOptions) } }