diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 9e6dfb7e9506f..38be61c52a95e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,9 +28,10 @@ import org.apache.spark.sql.AnalysisException * All public methods should be synchronized for thread-safety. */ class InMemoryCatalog extends Catalog { + import Catalog._ private class TableDesc(var table: Table) { - val partitions = new mutable.HashMap[String, TablePartition] + val partitions = new mutable.HashMap[PartitionSpec, TablePartition] } private class DatabaseDesc(var db: Database) { @@ -46,13 +47,20 @@ class InMemoryCatalog extends Catalog { } private def existsFunction(db: String, funcName: String): Boolean = { + assertDbExists(db) catalog(db).functions.contains(funcName) } private def existsTable(db: String, table: String): Boolean = { + assertDbExists(db) catalog(db).tables.contains(table) } + private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = { + assertTableExists(db, table) + catalog(db).tables(table).partitions.contains(spec) + } + private def assertDbExists(db: String): Unit = { if (!catalog.contains(db)) { throw new AnalysisException(s"Database $db does not exist") @@ -60,16 +68,20 @@ class InMemoryCatalog extends Catalog { } private def assertFunctionExists(db: String, funcName: String): Unit = { - assertDbExists(db) if (!existsFunction(db, funcName)) { - throw new AnalysisException(s"Function $funcName does not exists in $db database") + throw new AnalysisException(s"Function $funcName does not exist in $db database") } } private def assertTableExists(db: String, table: String): Unit = { - assertDbExists(db) if (!existsTable(db, table)) { - throw new AnalysisException(s"Table $table does not exists in $db database") + throw new AnalysisException(s"Table $table does not exist in $db database") + } + } + + private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = { + if (!existsPartition(db, table, spec)) { + throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec") } } @@ -77,9 +89,11 @@ class InMemoryCatalog extends Catalog { // Databases // -------------------------------------------------------------------------- - override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized { + override def createDatabase( + dbDefinition: Database, + ignoreIfExists: Boolean): Unit = synchronized { if (catalog.contains(dbDefinition.name)) { - if (!ifNotExists) { + if (!ignoreIfExists) { throw new AnalysisException(s"Database ${dbDefinition.name} already exists.") } } else { @@ -88,9 +102,9 @@ class InMemoryCatalog extends Catalog { } override def dropDatabase( - db: String, - ignoreIfNotExists: Boolean, - cascade: Boolean): Unit = synchronized { + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = synchronized { if (catalog.contains(db)) { if (!cascade) { // If cascade is false, make sure the database is empty. @@ -133,11 +147,13 @@ class InMemoryCatalog extends Catalog { // Tables // -------------------------------------------------------------------------- - override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean) - : Unit = synchronized { + override def createTable( + db: String, + tableDefinition: Table, + ignoreIfExists: Boolean): Unit = synchronized { assertDbExists(db) if (existsTable(db, tableDefinition.name)) { - if (!ifNotExists) { + if (!ignoreIfExists) { throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database") } } else { @@ -145,8 +161,10 @@ class InMemoryCatalog extends Catalog { } } - override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean) - : Unit = synchronized { + override def dropTable( + db: String, + table: String, + ignoreIfNotExists: Boolean): Unit = synchronized { assertDbExists(db) if (existsTable(db, table)) { catalog(db).tables.remove(table) @@ -190,14 +208,67 @@ class InMemoryCatalog extends Catalog { // Partitions // -------------------------------------------------------------------------- - override def alterPartition(db: String, table: String, part: TablePartition) - : Unit = synchronized { - throw new UnsupportedOperationException + override def createPartitions( + db: String, + table: String, + parts: Seq[TablePartition], + ignoreIfExists: Boolean): Unit = synchronized { + assertTableExists(db, table) + val existingParts = catalog(db).tables(table).partitions + if (!ignoreIfExists) { + val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec } + if (dupSpecs.nonEmpty) { + val dupSpecsStr = dupSpecs.mkString("\n===\n") + throw new AnalysisException( + s"The following partitions already exist in database $db table $table:\n$dupSpecsStr") + } + } + parts.foreach { p => existingParts.put(p.spec, p) } + } + + override def dropPartitions( + db: String, + table: String, + partSpecs: Seq[PartitionSpec], + ignoreIfNotExists: Boolean): Unit = synchronized { + assertTableExists(db, table) + val existingParts = catalog(db).tables(table).partitions + if (!ignoreIfNotExists) { + val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s } + if (missingSpecs.nonEmpty) { + val missingSpecsStr = missingSpecs.mkString("\n===\n") + throw new AnalysisException( + s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr") + } + } + partSpecs.foreach(existingParts.remove) } - override def alterPartitions(db: String, table: String, parts: Seq[TablePartition]) - : Unit = synchronized { - throw new UnsupportedOperationException + override def alterPartition( + db: String, + table: String, + spec: Map[String, String], + newPart: TablePartition): Unit = synchronized { + assertPartitionExists(db, table, spec) + val existingParts = catalog(db).tables(table).partitions + if (spec != newPart.spec) { + // Also a change in specs; remove the old one and add the new one back + existingParts.remove(spec) + } + existingParts.put(newPart.spec, newPart) + } + + override def getPartition( + db: String, + table: String, + spec: Map[String, String]): TablePartition = synchronized { + assertPartitionExists(db, table, spec) + catalog(db).tables(table).partitions(spec) + } + + override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized { + assertTableExists(db, table) + catalog(db).tables(table).partitions.values.toSeq } // -------------------------------------------------------------------------- @@ -205,11 +276,12 @@ class InMemoryCatalog extends Catalog { // -------------------------------------------------------------------------- override def createFunction( - db: String, func: Function, ifNotExists: Boolean): Unit = synchronized { + db: String, + func: Function, + ignoreIfExists: Boolean): Unit = synchronized { assertDbExists(db) - if (existsFunction(db, func.name)) { - if (!ifNotExists) { + if (!ignoreIfExists) { throw new AnalysisException(s"Function $func already exists in $db database") } } else { @@ -222,14 +294,16 @@ class InMemoryCatalog extends Catalog { catalog(db).functions.remove(funcName) } - override def alterFunction(db: String, funcName: String, funcDefinition: Function) - : Unit = synchronized { + override def alterFunction( + db: String, + funcName: String, + funcDefinition: Function): Unit = synchronized { assertFunctionExists(db, funcName) if (funcName != funcDefinition.name) { // Also a rename; remove the old one and add the new one back catalog(db).functions.remove(funcName) } - catalog(db).functions.put(funcName, funcDefinition) + catalog(db).functions.put(funcDefinition.name, funcDefinition) } override def getFunction(db: String, funcName: String): Function = synchronized { @@ -239,7 +313,6 @@ class InMemoryCatalog extends Catalog { override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { assertDbExists(db) - val regex = pattern.replaceAll("\\*", ".*").r filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index a6caf91f3304b..b4d7dd2f4e31c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -29,17 +29,15 @@ import org.apache.spark.sql.AnalysisException * Implementations should throw [[AnalysisException]] when table or database don't exist. */ abstract class Catalog { + import Catalog._ // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- - def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit + def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit - def dropDatabase( - db: String, - ignoreIfNotExists: Boolean, - cascade: Boolean): Unit + def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit def alterDatabase(db: String, dbDefinition: Database): Unit @@ -71,11 +69,28 @@ abstract class Catalog { // Partitions // -------------------------------------------------------------------------- - // TODO: need more functions for partitioning. + def createPartitions( + db: String, + table: String, + parts: Seq[TablePartition], + ignoreIfExists: Boolean): Unit - def alterPartition(db: String, table: String, part: TablePartition): Unit + def dropPartitions( + db: String, + table: String, + parts: Seq[PartitionSpec], + ignoreIfNotExists: Boolean): Unit - def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit + def alterPartition( + db: String, + table: String, + spec: PartitionSpec, + newPart: TablePartition): Unit + + def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition + + // TODO: support listing by pattern + def listPartitions(db: String, table: String): Seq[TablePartition] // -------------------------------------------------------------------------- // Functions @@ -132,11 +147,11 @@ case class Column( /** * A partition (Hive style) defined in the catalog. * - * @param values values for the partition columns + * @param spec partition spec values indexed by column name * @param storage storage format of the partition */ case class TablePartition( - values: Seq[String], + spec: Catalog.PartitionSpec, storage: StorageFormat ) @@ -176,3 +191,8 @@ case class Database( locationUri: String, properties: Map[String, String] ) + + +object Catalog { + type PartitionSpec = Map[String, String] +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index ab9d5ac8a20eb..0d8434323fcbb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -27,6 +27,11 @@ import org.apache.spark.sql.AnalysisException * Implementations of the [[Catalog]] interface can create test suites by extending this. */ abstract class CatalogTestCases extends SparkFunSuite { + private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map.empty[String, String]) + private val part1 = TablePartition(Map[String, String]("a" -> "1"), storageFormat) + private val part2 = TablePartition(Map[String, String]("b" -> "2"), storageFormat) + private val part3 = TablePartition(Map[String, String]("c" -> "3"), storageFormat) + private val funcClass = "org.apache.spark.myFunc" protected def newEmptyCatalog(): Catalog @@ -41,16 +46,16 @@ abstract class CatalogTestCases extends SparkFunSuite { */ private def newBasicCatalog(): Catalog = { val catalog = newEmptyCatalog() - catalog.createDatabase(newDb("db1"), ifNotExists = false) - catalog.createDatabase(newDb("db2"), ifNotExists = false) - + catalog.createDatabase(newDb("db1"), ignoreIfExists = false) + catalog.createDatabase(newDb("db2"), ignoreIfExists = false) catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false) catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false) catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false) + catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) catalog } - private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc") + private def newFunc(): Function = Function("funcname", funcClass) private def newDb(name: String = "default"): Database = Database(name, name + " description", "uri", Map.empty) @@ -59,7 +64,7 @@ abstract class CatalogTestCases extends SparkFunSuite { Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0, None, None) - private def newFunc(name: String): Function = Function(name, "class.name") + private def newFunc(name: String): Function = Function(name, funcClass) // -------------------------------------------------------------------------- // Databases @@ -67,10 +72,10 @@ abstract class CatalogTestCases extends SparkFunSuite { test("basic create, drop and list databases") { val catalog = newEmptyCatalog() - catalog.createDatabase(newDb(), ifNotExists = false) + catalog.createDatabase(newDb(), ignoreIfExists = false) assert(catalog.listDatabases().toSet == Set("default")) - catalog.createDatabase(newDb("default2"), ifNotExists = false) + catalog.createDatabase(newDb("default2"), ignoreIfExists = false) assert(catalog.listDatabases().toSet == Set("default", "default2")) } @@ -253,11 +258,194 @@ abstract class CatalogTestCases extends SparkFunSuite { // Partitions // -------------------------------------------------------------------------- - // TODO: Add tests cases for partitions + test("basic create and list partitions") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false) + catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false) + assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2)) + } + + test("create partitions when database / table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false) + } + intercept[AnalysisException] { + catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false) + } + } + + test("create partitions that already exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false) + } + catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) + } + + test("drop partitions") { + val catalog = newBasicCatalog() + assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2)) + catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) + assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2)) + val catalog2 = newBasicCatalog() + assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2)) + catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + assert(catalog2.listPartitions("db2", "tbl2").isEmpty) + } + + test("drop partitions when database / table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) + } + intercept[AnalysisException] { + catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false) + } + } + + test("drop partitions that do not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) + } + catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) + } + + test("get partition") { + val catalog = newBasicCatalog() + assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1) + assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2) + intercept[AnalysisException] { + catalog.getPartition("db2", "tbl1", part3.spec) + } + } + + test("get partition when database / table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getPartition("does_not_exist", "tbl1", part1.spec) + } + intercept[AnalysisException] { + catalog.getPartition("db2", "does_not_exist", part1.spec) + } + } + + test("alter partitions") { + val catalog = newBasicCatalog() + val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde")) + val partNewSpec = part1.copy(spec = Map("x" -> "10")) + // alter but keep spec the same + catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec) + assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec) + // alter and change spec + catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec) + intercept[AnalysisException] { + catalog.getPartition("db2", "tbl2", part1.spec) + } + assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec) + } + + test("alter partition when database / table does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1) + } + intercept[AnalysisException] { + catalog.alterPartition("db2", "does_not_exist", part1.spec, part1) + } + } // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- - // TODO: Add tests cases for functions + test("basic create and list functions") { + val catalog = newEmptyCatalog() + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false) + assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc")) + } + + test("create function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false) + } + } + + test("create function that already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false) + } + catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true) + } + + test("drop function") { + val catalog = newBasicCatalog() + assert(catalog.listFunctions("db2", "*").toSet == Set("func1")) + catalog.dropFunction("db2", "func1") + assert(catalog.listFunctions("db2", "*").isEmpty) + } + + test("drop function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropFunction("does_not_exist", "something") + } + } + + test("drop function that does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.dropFunction("db2", "does_not_exist") + } + } + + test("get function") { + val catalog = newBasicCatalog() + assert(catalog.getFunction("db2", "func1") == newFunc("func1")) + intercept[AnalysisException] { + catalog.getFunction("db2", "does_not_exist") + } + } + + test("get function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.getFunction("does_not_exist", "func1") + } + } + + test("alter function") { + val catalog = newBasicCatalog() + assert(catalog.getFunction("db2", "func1").className == funcClass) + // alter func but keep name + catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha")) + assert(catalog.getFunction("db2", "func1").className == "muhaha") + // alter func and change name + catalog.alterFunction("db2", "func1", newFunc("funcky")) + intercept[AnalysisException] { + catalog.getFunction("db2", "func1") + } + assert(catalog.getFunction("db2", "funcky").className == funcClass) + } + + test("alter function when database does not exist") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.alterFunction("does_not_exist", "func1", newFunc()) + } + } + + test("list functions") { + val catalog = newBasicCatalog() + catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false) + assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me")) + assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) + } + }