-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-13080] [SQL] Implement new Catalog API using Hive #11189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3b66605
f3e094a
4b09a7d
526f278
4aa6e66
433d180
ff5c5be
ff49f0c
ca98c00
71f9964
13795d8
af5ffc0
d7b18e6
a915d01
3ceb88d
07332ad
cdf1f70
bbb8170
2b72025
5e2cd3a
6519c2a
7d58fac
1c05b9b
4ecc3b1
863ebd0
5394492
fe295fb
43e3c66
2765649
428c3c5
ed9c6fa
cb288da
2ba1990
d9a7723
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec | ||
|
|
||
|
|
||
| /** | ||
| * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception | ||
| * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. | ||
| */ | ||
| abstract class NoSuchItemException extends Exception { override def getMessage: String } | ||
|
|
||
| class NoSuchDatabaseException(db: String) extends NoSuchItemException { | ||
| override def getMessage: String = s"Database $db not found" | ||
| } | ||
|
|
||
| class NoSuchTableException(db: String, table: String) extends NoSuchItemException { | ||
| override def getMessage: String = s"Table $table not found in database $db" | ||
| } | ||
|
|
||
| class NoSuchPartitionException( | ||
| db: String, | ||
| table: String, | ||
| spec: TablePartitionSpec) | ||
| extends NoSuchItemException { | ||
|
|
||
| override def getMessage: String = { | ||
| s"Partition not found in table $table database $db:\n" + spec.mkString("\n") | ||
| } | ||
| } | ||
|
|
||
| class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException { | ||
| override def getMessage: String = s"Function $func not found in database $db" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException | |
| class InMemoryCatalog extends Catalog { | ||
| import Catalog._ | ||
|
|
||
| private class TableDesc(var table: Table) { | ||
| val partitions = new mutable.HashMap[PartitionSpec, TablePartition] | ||
| private class TableDesc(var table: CatalogTable) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file is in catalog package, do we still need these prefix ( |
||
| val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition] | ||
| } | ||
|
|
||
| private class DatabaseDesc(var db: Database) { | ||
| private class DatabaseDesc(var db: CatalogDatabase) { | ||
| val tables = new mutable.HashMap[String, TableDesc] | ||
| val functions = new mutable.HashMap[String, Function] | ||
| val functions = new mutable.HashMap[String, CatalogFunction] | ||
| } | ||
|
|
||
| // Database name -> description | ||
| private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc] | ||
|
|
||
| private def filterPattern(names: Seq[String], pattern: String): Seq[String] = { | ||
|
|
@@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog { | |
| } | ||
|
|
||
| private def existsFunction(db: String, funcName: String): Boolean = { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we can leave these tiny rename out of this PR, it will definitely help reviewing. |
||
| catalog(db).functions.contains(funcName) | ||
| } | ||
|
|
||
| private def existsTable(db: String, table: String): Boolean = { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| catalog(db).tables.contains(table) | ||
| } | ||
|
|
||
| private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = { | ||
| assertTableExists(db, table) | ||
| private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { | ||
| requireTableExists(db, table) | ||
| catalog(db).tables(table).partitions.contains(spec) | ||
| } | ||
|
|
||
| private def assertDbExists(db: String): Unit = { | ||
| if (!catalog.contains(db)) { | ||
| throw new AnalysisException(s"Database $db does not exist") | ||
| } | ||
| } | ||
|
|
||
| private def assertFunctionExists(db: String, funcName: String): Unit = { | ||
| private def requireFunctionExists(db: String, funcName: String): Unit = { | ||
| if (!existsFunction(db, funcName)) { | ||
| throw new AnalysisException(s"Function $funcName does not exist in $db database") | ||
| } | ||
| } | ||
|
|
||
| private def assertTableExists(db: String, table: String): Unit = { | ||
| private def requireTableExists(db: String, table: String): Unit = { | ||
| if (!existsTable(db, table)) { | ||
| throw new AnalysisException(s"Table $table does not exist in $db database") | ||
| } | ||
| } | ||
|
|
||
| private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = { | ||
| private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { | ||
| if (!existsPartition(db, table, spec)) { | ||
| throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec") | ||
| } | ||
|
|
@@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog { | |
| // -------------------------------------------------------------------------- | ||
|
|
||
| override def createDatabase( | ||
| dbDefinition: Database, | ||
| dbDefinition: CatalogDatabase, | ||
| ignoreIfExists: Boolean): Unit = synchronized { | ||
| if (catalog.contains(dbDefinition.name)) { | ||
| if (!ignoreIfExists) { | ||
|
|
@@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog { | |
| } | ||
| } | ||
|
|
||
| override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized { | ||
| assertDbExists(db) | ||
| assert(db == dbDefinition.name) | ||
| catalog(db).db = dbDefinition | ||
| override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized { | ||
| requireDbExists(dbDefinition.name) | ||
| catalog(dbDefinition.name).db = dbDefinition | ||
| } | ||
|
|
||
| override def getDatabase(db: String): Database = synchronized { | ||
| assertDbExists(db) | ||
| override def getDatabase(db: String): CatalogDatabase = synchronized { | ||
| requireDbExists(db) | ||
| catalog(db).db | ||
| } | ||
|
|
||
| override def databaseExists(db: String): Boolean = synchronized { | ||
| catalog.contains(db) | ||
| } | ||
|
|
||
| override def listDatabases(): Seq[String] = synchronized { | ||
| catalog.keySet.toSeq | ||
| } | ||
|
|
@@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog { | |
| filterPattern(listDatabases(), pattern) | ||
| } | ||
|
|
||
| override def setCurrentDatabase(db: String): Unit = { /* no-op */ } | ||
|
|
||
| // -------------------------------------------------------------------------- | ||
| // Tables | ||
| // -------------------------------------------------------------------------- | ||
|
|
||
| override def createTable( | ||
| db: String, | ||
| tableDefinition: Table, | ||
| tableDefinition: CatalogTable, | ||
| ignoreIfExists: Boolean): Unit = synchronized { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| if (existsTable(db, tableDefinition.name)) { | ||
| if (!ignoreIfExists) { | ||
| throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database") | ||
|
|
@@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog { | |
| db: String, | ||
| table: String, | ||
| ignoreIfNotExists: Boolean): Unit = synchronized { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| if (existsTable(db, table)) { | ||
| catalog(db).tables.remove(table) | ||
| } else { | ||
|
|
@@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog { | |
| } | ||
|
|
||
| override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { | ||
| assertTableExists(db, oldName) | ||
| requireTableExists(db, oldName) | ||
| val oldDesc = catalog(db).tables(oldName) | ||
| oldDesc.table = oldDesc.table.copy(name = newName) | ||
| catalog(db).tables.put(newName, oldDesc) | ||
| catalog(db).tables.remove(oldName) | ||
| } | ||
|
|
||
| override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized { | ||
| assertTableExists(db, table) | ||
| assert(table == tableDefinition.name) | ||
| catalog(db).tables(table).table = tableDefinition | ||
| override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized { | ||
| requireTableExists(db, tableDefinition.name) | ||
| catalog(db).tables(tableDefinition.name).table = tableDefinition | ||
| } | ||
|
|
||
| override def getTable(db: String, table: String): Table = synchronized { | ||
| assertTableExists(db, table) | ||
| override def getTable(db: String, table: String): CatalogTable = synchronized { | ||
| requireTableExists(db, table) | ||
| catalog(db).tables(table).table | ||
| } | ||
|
|
||
| override def listTables(db: String): Seq[String] = synchronized { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| catalog(db).tables.keySet.toSeq | ||
| } | ||
|
|
||
| override def listTables(db: String, pattern: String): Seq[String] = synchronized { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| filterPattern(listTables(db), pattern) | ||
| } | ||
|
|
||
|
|
@@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog { | |
| override def createPartitions( | ||
| db: String, | ||
| table: String, | ||
| parts: Seq[TablePartition], | ||
| parts: Seq[CatalogTablePartition], | ||
| ignoreIfExists: Boolean): Unit = synchronized { | ||
| assertTableExists(db, table) | ||
| requireTableExists(db, table) | ||
| val existingParts = catalog(db).tables(table).partitions | ||
| if (!ignoreIfExists) { | ||
| val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec } | ||
|
|
@@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog { | |
| override def dropPartitions( | ||
| db: String, | ||
| table: String, | ||
| partSpecs: Seq[PartitionSpec], | ||
| partSpecs: Seq[TablePartitionSpec], | ||
| ignoreIfNotExists: Boolean): Unit = synchronized { | ||
| assertTableExists(db, table) | ||
| requireTableExists(db, table) | ||
| val existingParts = catalog(db).tables(table).partitions | ||
| if (!ignoreIfNotExists) { | ||
| val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s } | ||
|
|
@@ -244,75 +243,82 @@ class InMemoryCatalog extends Catalog { | |
| partSpecs.foreach(existingParts.remove) | ||
| } | ||
|
|
||
| override def alterPartition( | ||
| override def renamePartitions( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to still call it |
||
| db: String, | ||
| table: String, | ||
| spec: Map[String, String], | ||
| newPart: TablePartition): Unit = synchronized { | ||
| assertPartitionExists(db, table, spec) | ||
| val existingParts = catalog(db).tables(table).partitions | ||
| if (spec != newPart.spec) { | ||
| // Also a change in specs; remove the old one and add the new one back | ||
| existingParts.remove(spec) | ||
| specs: Seq[TablePartitionSpec], | ||
| newSpecs: Seq[TablePartitionSpec]): Unit = synchronized { | ||
| require(specs.size == newSpecs.size, "number of old and new partition specs differ") | ||
| specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => | ||
| val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) | ||
| val existingParts = catalog(db).tables(table).partitions | ||
| existingParts.remove(oldSpec) | ||
| existingParts.put(newSpec, newPart) | ||
| } | ||
| } | ||
|
|
||
| override def alterPartitions( | ||
| db: String, | ||
| table: String, | ||
| parts: Seq[CatalogTablePartition]): Unit = synchronized { | ||
| parts.foreach { p => | ||
| requirePartitionExists(db, table, p.spec) | ||
| catalog(db).tables(table).partitions.put(p.spec, p) | ||
| } | ||
| existingParts.put(newPart.spec, newPart) | ||
| } | ||
|
|
||
| override def getPartition( | ||
| db: String, | ||
| table: String, | ||
| spec: Map[String, String]): TablePartition = synchronized { | ||
| assertPartitionExists(db, table, spec) | ||
| spec: TablePartitionSpec): CatalogTablePartition = synchronized { | ||
| requirePartitionExists(db, table, spec) | ||
| catalog(db).tables(table).partitions(spec) | ||
| } | ||
|
|
||
| override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized { | ||
| assertTableExists(db, table) | ||
| override def listPartitions( | ||
| db: String, | ||
| table: String): Seq[CatalogTablePartition] = synchronized { | ||
| requireTableExists(db, table) | ||
| catalog(db).tables(table).partitions.values.toSeq | ||
| } | ||
|
|
||
| // -------------------------------------------------------------------------- | ||
| // Functions | ||
| // -------------------------------------------------------------------------- | ||
|
|
||
| override def createFunction( | ||
| db: String, | ||
| func: Function, | ||
| ignoreIfExists: Boolean): Unit = synchronized { | ||
| assertDbExists(db) | ||
| override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { | ||
| requireDbExists(db) | ||
| if (existsFunction(db, func.name)) { | ||
| if (!ignoreIfExists) { | ||
| throw new AnalysisException(s"Function $func already exists in $db database") | ||
| } | ||
| throw new AnalysisException(s"Function $func already exists in $db database") | ||
| } else { | ||
| catalog(db).functions.put(func.name, func) | ||
| } | ||
| } | ||
|
|
||
| override def dropFunction(db: String, funcName: String): Unit = synchronized { | ||
| assertFunctionExists(db, funcName) | ||
| requireFunctionExists(db, funcName) | ||
| catalog(db).functions.remove(funcName) | ||
| } | ||
|
|
||
| override def alterFunction( | ||
| db: String, | ||
| funcName: String, | ||
| funcDefinition: Function): Unit = synchronized { | ||
| assertFunctionExists(db, funcName) | ||
| if (funcName != funcDefinition.name) { | ||
| // Also a rename; remove the old one and add the new one back | ||
| catalog(db).functions.remove(funcName) | ||
| } | ||
| override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { | ||
| requireFunctionExists(db, oldName) | ||
| val newFunc = getFunction(db, oldName).copy(name = newName) | ||
| catalog(db).functions.remove(oldName) | ||
| catalog(db).functions.put(newName, newFunc) | ||
| } | ||
|
|
||
| override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized { | ||
| requireFunctionExists(db, funcDefinition.name) | ||
| catalog(db).functions.put(funcDefinition.name, funcDefinition) | ||
| } | ||
|
|
||
| override def getFunction(db: String, funcName: String): Function = synchronized { | ||
| assertFunctionExists(db, funcName) | ||
| override def getFunction(db: String, funcName: String): CatalogFunction = synchronized { | ||
| requireFunctionExists(db, funcName) | ||
| catalog(db).functions(funcName) | ||
| } | ||
|
|
||
| override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { | ||
| assertDbExists(db) | ||
| requireDbExists(db) | ||
| filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually did not inline a method