Skip to content
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ class DataFrame private[sql](
*/
@Experimental
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
if (sqlContext.catalog.tableExists(tableName.split("\\.")) && mode == SaveMode.Append) {
// If table already exists and the save mode is Append,
// we will just call insertInto to append the contents of this DataFrame.
insertInto(tableName, overwrite = false)
Expand Down Expand Up @@ -1151,7 +1151,7 @@ class DataFrame private[sql](
options: Map[String, String]): Unit = {
val cmd =
CreateTableUsingAsSelect(
tableName,
tableName.split("\\."),
source,
temporary = false,
mode,
Expand Down
6 changes: 3 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
options: Map[String, String]): DataFrame = {
val cmd =
CreateTableUsing(
tableName,
tableName.split("\\."),
userSpecifiedSchema = None,
source,
temporary = false,
Expand Down Expand Up @@ -845,7 +845,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
options: Map[String, String]): DataFrame = {
val cmd =
CreateTableUsing(
tableName,
tableName.split("\\."),
userSpecifiedSchema = Some(schema),
source,
temporary = false,
Expand Down Expand Up @@ -958,7 +958,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group ddl_ops
*/
def table(tableName: String): DataFrame =
DataFrame(this, catalog.lookupRelation(Seq(tableName)))
DataFrame(this, catalog.lookupRelation(tableName.split("\\.")))

/**
* Returns a [[DataFrame]] containing names of existing tables in the current database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,18 +329,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false, _) =>
case CreateTableUsing(tableIdentifier, userSpecifiedSchema, provider, true, opts, false, _) =>
ExecutedCommand(
CreateTempTableUsing(
tableName, userSpecifiedSchema, provider, opts)) :: Nil
tableIdentifier, userSpecifiedSchema, provider, opts)) :: Nil
case c: CreateTableUsing if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
case CreateTableUsingAsSelect(tableIdentifier, provider, true, mode, opts, query) =>
val cmd =
CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
CreateTempTableUsingAsSelect(tableIdentifier, provider, mode, opts, query)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
Expand Down
27 changes: 13 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ private[sql] class DDLParser(
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] =
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ rep1sep(ident, ".") ~
(tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableIdentifier ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
Expand All @@ -108,7 +107,7 @@ private[sql] class DDLParser(
}

val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableName,
CreateTableUsingAsSelect(tableIdentifier,
provider,
temp.isDefined,
mode,
Expand All @@ -117,7 +116,7 @@ private[sql] class DDLParser(
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableName,
tableIdentifier,
userSpecifiedSchema,
provider,
temp.isDefined,
Expand Down Expand Up @@ -286,7 +285,7 @@ private[sql] case class DescribeCommand(
* If it is false, an exception will be thrown
*/
private[sql] case class CreateTableUsing(
tableName: String,
tableIdentifier: Seq[String],
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
Expand All @@ -301,7 +300,7 @@ private[sql] case class CreateTableUsing(
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
private[sql] case class CreateTableUsingAsSelect(
tableName: String,
tableIdentifier: Seq[String],
provider: String,
temporary: Boolean,
mode: SaveMode,
Expand All @@ -312,22 +311,22 @@ private[sql] case class CreateTableUsingAsSelect(
// override lazy val resolved = databaseName != None && childrenResolved
}

private[sql] case class CreateTempTableUsing(
tableName: String,
private [sql] case class CreateTempTableUsing(
tableIdentifier: Seq[String],
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableIdentifier.mkString("."))
Seq.empty
}
}

private[sql] case class CreateTempTableUsingAsSelect(
tableName: String,
private [sql] case class CreateTempTableUsingAsSelect(
tableIdentifier: Seq[String],
provider: String,
mode: SaveMode,
options: Map[String, String],
Expand All @@ -337,7 +336,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableIdentifier.mkString("."))

Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")

case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) =>
case CreateTableUsingAsSelect(tableIdentifier, _, _, SaveMode.Overwrite, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
if (catalog.tableExists(Seq(tableName))) {
if (catalog.tableExists(tableIdentifier)) {
// Need to remove SubQuery operator.
EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
EliminateSubQueries(catalog.lookupRelation(tableIdentifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation) =>
Expand All @@ -122,7 +122,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table $tableName that is also being read from.")
s"Cannot overwrite table ${tableIdentifier.mkString(".")}" +
s" that is also being read from.")
} else {
// OK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* call this function to invalidate the cache.
*/
def refreshTable(tableName: String): Unit = {
// TODO: Database support...
catalog.refreshTable("default", tableName)
val (dbName, tblName) = catalog.getDBAndTableName(tableName.split("\\."))
catalog.refreshTable(dbName, tblName)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
// TODO: Database support...
catalog.invalidateTable("default", tableName)
val (dbName, tblName) = catalog.getDBAndTableName(tableName.split("\\."))
catalog.invalidateTable(dbName, tblName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)

// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String) {
def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
}

/** A cache of Spark SQL data source tables that have been accessed. */
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
protected[hive] val cachedDataSourceTables: LoadingCache[Seq[String], LogicalPlan] = {
val cacheLoader = new CacheLoader[Seq[String], LogicalPlan]() {
override def load(in: Seq[String]): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = HiveMetastoreCatalog.this.synchronized {
client.getTable(in.database, in.name)
client.getTable(in(0), in(1))
}

def schemaStringFromParts: Option[String] = {
Expand Down Expand Up @@ -128,22 +122,30 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
cachedDataSourceTables.invalidate(Seq(databaseName, tableName).map(_.toLowerCase()))
}

val caseSensitive: Boolean = false

private[hive] def getDBAndTableName(tableIdentifier: Seq[String]): (String, String) = {
val tableIdent = processTableIdentifier(tableIdentifier)
val dbName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
(dbName, tblName)
}

/**
* Creates a data source table (a table created with USING clause) in Hive's metastore.
* Returns true when the table has been created. Otherwise, false.
*/
def createDataSourceTable(
tableName: String,
tableIdentifier: Seq[String],
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val (dbName, tblName) = getDBAndTableName(tableIdentifier)
val tbl = new Table(dbName, tblName)

tbl.setProperty("spark.sql.sources.provider", provider)
Expand Down Expand Up @@ -174,29 +176,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

def hiveDefaultTableFilePath(tableName: String): String = synchronized {
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
val (databaseName, tblName) = getDBAndTableName(tableName.split("\\."))
val db = client.getDatabase(databaseName)

hiveWarehouse.getTablePath(currentDatabase, tableName).toString
hiveWarehouse.getTablePath(db, tblName).toString
}

def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName =
tableIdent
.lift(tableIdent.size - 2)
.getOrElse(hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val (databaseName, tblName) = getDBAndTableName(tableIdentifier)
client.getTable(databaseName, tblName, false) != null
}

def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String]): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = try {

val (databaseName, tblName) = getDBAndTableName(tableIdentifier)
val table = try {
synchronized {
client.getTable(databaseName, tblName)
}
Expand All @@ -207,7 +204,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

if (table.getProperty("spark.sql.sources.provider") != null) {
val dataSourceTable =
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
cachedDataSourceTables(Seq(databaseName, tblName).map(_.toLowerCase))
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Othersie, wrap the table with a Subquery using the table name.
val withAlias =
Expand Down Expand Up @@ -245,10 +242,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
Seq(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
tableIdentifier: Seq[String],
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
Expand Down Expand Up @@ -331,23 +328,21 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

/**
* Create table with specified database, table name, table description and schema
* @param databaseName Database Name
* @param tableName Table Name
* @param tableIdentifier A fully qualified identifier for a table (i.e., database.tableName)
* @param schema Schema of the new table, if not specified, will use the schema
* specified in crtTbl
* @param allowExisting if true, ignore AlreadyExistsException
* @param desc CreateTableDesc object which contains the SerDe info. Currently
* we support most of the features except the bucket.
*/
def createTable(
databaseName: String,
tableName: String,
tableIdentifier: Seq[String],
schema: Seq[Attribute],
allowExisting: Boolean = false,
desc: Option[CreateTableDesc] = None) {
val hconf = hive.hiveconf

val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
val (dbName, tblName) = getDBAndTableName(tableIdentifier)
val tbl = new Table(dbName, tblName)

val crtTbl: CreateTableDesc = desc.getOrElse(null)
Expand Down Expand Up @@ -602,6 +597,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Need to think about how to implement the CreateTableAsSelect.resolved
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val tableIdentifier = if(dbName.nonEmpty) {
Seq(dbName.get, tblName)
} else {
Seq(tblName)
}
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

// Get the CreateTableDesc from Hive SemanticAnalyzer
Expand Down Expand Up @@ -641,7 +641,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
tableIdentifier,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
Expand All @@ -661,6 +661,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val tableIdentifier = if(dbName.nonEmpty) {
Seq(dbName.get, tblName)
} else {
Seq(tblName)
}
if (hive.convertCTAS) {
if (dbName.isDefined) {
throw new AnalysisException(
Expand All @@ -670,7 +675,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
tableIdentifier,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C

val tableIdent =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => Seq(tableOnly)
case Seq(tableName) => tableName.split("\\.").toSeq
case Seq(databaseName, table) => Seq(databaseName, table)
case other => sys.error("Hive only supports tables names like 'tableName' " +
s"or 'databaseName.tableName', found '$other'")
Expand Down
Loading