Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3b66605
Add skeleton for HiveCatalog
Feb 10, 2016
f3e094a
Implement createDatabase
Feb 10, 2016
4b09a7d
Fix style
Feb 10, 2016
526f278
Implement dropDatabase
Feb 10, 2016
4aa6e66
Implement alterDatabase
Feb 10, 2016
433d180
Implement getDatabase, listDatabases and databaseExists
Feb 10, 2016
ff5c5be
Implement createTable
Feb 10, 2016
ff49f0c
Explicitly mark methods with override in HiveCatalog
Feb 10, 2016
ca98c00
Implement dropTable
Feb 10, 2016
71f9964
Implement renameTable, alterTable
Feb 10, 2016
13795d8
Remove intermediate representation of tables, columns etc.
Feb 12, 2016
af5ffc0
Remove TableType enum
Feb 12, 2016
d7b18e6
Re-implement all table operations after the refactor
Feb 12, 2016
a915d01
Implement all partition operations
Feb 12, 2016
3ceb88d
Implement all function operations
Feb 12, 2016
07332ad
Simplify alterDatabase
Feb 12, 2016
cdf1f70
Clean up HiveClientImpl a little
Feb 12, 2016
bbb8170
Merge branch 'master' of github.com:apache/spark into hive-catalog
Feb 12, 2016
2b72025
Fix tests?
Feb 12, 2016
5e2cd3a
Miscellaneous cleanup
Feb 12, 2016
6519c2a
Merge branch 'master' of github.com:apache/spark into hive-catalog
Feb 16, 2016
7d58fac
Address comments + minor cleanups
Feb 16, 2016
1c05b9b
Fix wrong Hive TableType issue
Feb 16, 2016
4ecc3b1
Fix CREATE TABLE serde setting
Feb 16, 2016
863ebd0
Fix NPE in CREATE VIEW
Feb 16, 2016
5394492
Change CatalogColumn#dataType to String
Feb 17, 2016
fe295fb
Fix style
Feb 17, 2016
43e3c66
Fix MetastoreDataSourcesSuite
Feb 18, 2016
2765649
Add HiveCatalogSuite
Feb 18, 2016
428c3c5
Merge branch 'master' of github.com:apache/spark into hive-catalog
Feb 18, 2016
ed9c6fa
Merge branch 'master' of github.com:apache/spark into hive-catalog
Feb 18, 2016
cb288da
Miscellaneous clean ups
Feb 18, 2016
2ba1990
Un-ignore alter partitions test
Feb 19, 2016
d9a7723
Fix linking error in tests
Feb 19, 2016
df0ad86
Merge pull request #11189 from andrewor14/hive-catalog
rxin Feb 21, 2016
6703aa5
code review
rxin Feb 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.sql

import org.apache.spark.annotation.DeveloperApi


// TODO: don't swallow original stack trace if it exists

/**
* :: DeveloperApi ::
* Thrown when a query fails to analyze, usually because the query itself is invalid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}

/**
* Thrown by a catalog when a table cannot be found. The analyzer will rethrow the exception
* as an AnalysisException with the correct position information.
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec


/**
* Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
abstract class NoSuchItemException extends Exception {
override def getMessage: String
}

class NoSuchDatabaseException(db: String) extends NoSuchItemException {
override def getMessage: String = s"Database $db not found"
}

class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
override def getMessage: String = s"Table $table not found in database $db"
}

class NoSuchPartitionException(
db: String,
table: String,
spec: TablePartitionSpec)
extends NoSuchItemException {

override def getMessage: String = {
s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
}
}

class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
override def getMessage: String = s"Function $func not found in database $db"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
class InMemoryCatalog extends Catalog {
import Catalog._

private class TableDesc(var table: Table) {
val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
private class TableDesc(var table: CatalogTable) {
val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
}

private class DatabaseDesc(var db: Database) {
private class DatabaseDesc(var db: CatalogDatabase) {
val tables = new mutable.HashMap[String, TableDesc]
val functions = new mutable.HashMap[String, Function]
val functions = new mutable.HashMap[String, CatalogFunction]
}

// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]

private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
Expand All @@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
}

private def existsFunction(db: String, funcName: String): Boolean = {
assertDbExists(db)
requireDbExists(db)
catalog(db).functions.contains(funcName)
}

private def existsTable(db: String, table: String): Boolean = {
assertDbExists(db)
requireDbExists(db)
catalog(db).tables.contains(table)
}

private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
assertTableExists(db, table)
private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}

private def assertDbExists(db: String): Unit = {
if (!catalog.contains(db)) {
throw new AnalysisException(s"Database $db does not exist")
}
}

private def assertFunctionExists(db: String, funcName: String): Unit = {
private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}

private def assertTableExists(db: String, table: String): Unit = {
private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}

private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
Expand All @@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------

override def createDatabase(
dbDefinition: Database,
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
Expand Down Expand Up @@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
}
}

override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
assertDbExists(db)
assert(db == dbDefinition.name)
catalog(db).db = dbDefinition
override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
requireDbExists(dbDefinition.name)
catalog(dbDefinition.name).db = dbDefinition
}

override def getDatabase(db: String): Database = synchronized {
assertDbExists(db)
override def getDatabase(db: String): CatalogDatabase = synchronized {
requireDbExists(db)
catalog(db).db
}

override def databaseExists(db: String): Boolean = synchronized {
catalog.contains(db)
}

override def listDatabases(): Seq[String] = synchronized {
catalog.keySet.toSeq
}
Expand All @@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
filterPattern(listDatabases(), pattern)
}

override def setCurrentDatabase(db: String): Unit = { /* no-op */ }

// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------

override def createTable(
db: String,
tableDefinition: Table,
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
Expand All @@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
db: String,
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
assertDbExists(db)
requireDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
Expand All @@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
assertTableExists(db, oldName)
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
assertTableExists(db, table)
assert(table == tableDefinition.name)
catalog(db).tables(table).table = tableDefinition
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name)
catalog(db).tables(tableDefinition.name).table = tableDefinition
}

override def getTable(db: String, table: String): Table = synchronized {
assertTableExists(db, table)
override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
}

override def listTables(db: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
filterPattern(listTables(db), pattern)
}

Expand All @@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
override def createPartitions(
db: String,
table: String,
parts: Seq[TablePartition],
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
Expand All @@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
override def dropPartitions(
db: String,
table: String,
partSpecs: Seq[PartitionSpec],
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = synchronized {
assertTableExists(db, table)
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
Expand All @@ -244,75 +243,82 @@ class InMemoryCatalog extends Catalog {
partSpecs.foreach(existingParts.remove)
}

override def alterPartition(
override def renamePartitions(
db: String,
table: String,
spec: Map[String, String],
newPart: TablePartition): Unit = synchronized {
assertPartitionExists(db, table, spec)
val existingParts = catalog(db).tables(table).partitions
if (spec != newPart.spec) {
// Also a change in specs; remove the old one and add the new one back
existingParts.remove(spec)
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
val existingParts = catalog(db).tables(table).partitions
existingParts.remove(oldSpec)
existingParts.put(newSpec, newPart)
}
}

override def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = synchronized {
parts.foreach { p =>
requirePartitionExists(db, table, p.spec)
catalog(db).tables(table).partitions.put(p.spec, p)
}
existingParts.put(newPart.spec, newPart)
}

override def getPartition(
db: String,
table: String,
spec: Map[String, String]): TablePartition = synchronized {
assertPartitionExists(db, table, spec)
spec: TablePartitionSpec): CatalogTablePartition = synchronized {
requirePartitionExists(db, table, spec)
catalog(db).tables(table).partitions(spec)
}

override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
assertTableExists(db, table)
override def listPartitions(
db: String,
table: String): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).partitions.values.toSeq
}

// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------

override def createFunction(
db: String,
func: Function,
ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists in $db database")
}
throw new AnalysisException(s"Function $func already exists in $db database")
} else {
catalog(db).functions.put(func.name, func)
}
}

override def dropFunction(db: String, funcName: String): Unit = synchronized {
assertFunctionExists(db, funcName)
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}

override def alterFunction(
db: String,
funcName: String,
funcDefinition: Function): Unit = synchronized {
assertFunctionExists(db, funcName)
if (funcName != funcDefinition.name) {
// Also a rename; remove the old one and add the new one back
catalog(db).functions.remove(funcName)
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
val newFunc = getFunction(db, oldName).copy(name = newName)
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
}

override def getFunction(db: String, funcName: String): Function = synchronized {
assertFunctionExists(db, funcName)
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
}

override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
requireDbExists(db)
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}

Expand Down
Loading