diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 03d9834d1d13..1351f8d38026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1068,7 +1068,7 @@ class DataFrame private[sql]( */ @Experimental def saveAsTable(tableName: String, mode: SaveMode): Unit = { - if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) { + if (sqlContext.catalog.tableExists(tableName.split("\\.")) && mode == SaveMode.Append) { // If table already exists and the save mode is Append, // we will just call insertInto to append the contents of this DataFrame. insertInto(tableName, overwrite = false) @@ -1151,7 +1151,7 @@ class DataFrame private[sql]( options: Map[String, String]): Unit = { val cmd = CreateTableUsingAsSelect( - tableName, + tableName.split("\\."), source, temporary = false, mode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bcd20c06c6dc..9016513ebf02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -802,7 +802,7 @@ class SQLContext(@transient val sparkContext: SparkContext) options: Map[String, String]): DataFrame = { val cmd = CreateTableUsing( - tableName, + tableName.split("\\."), userSpecifiedSchema = None, source, temporary = false, @@ -845,7 +845,7 @@ class SQLContext(@transient val sparkContext: SparkContext) options: Map[String, String]): DataFrame = { val cmd = CreateTableUsing( - tableName, + tableName.split("\\."), userSpecifiedSchema = Some(schema), source, temporary = false, @@ -958,7 +958,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group ddl_ops */ def table(tableName: String): DataFrame = - DataFrame(this, catalog.lookupRelation(Seq(tableName))) + DataFrame(this, catalog.lookupRelation(tableName.split("\\."))) /** * Returns a [[DataFrame]] containing names of existing tables in the current database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 030ef118f75d..d41bda0534d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -329,18 +329,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false, _) => + case CreateTableUsing(tableIdentifier, userSpecifiedSchema, provider, true, opts, false, _) => ExecutedCommand( CreateTempTableUsing( - tableName, userSpecifiedSchema, provider, opts)) :: Nil + tableIdentifier, userSpecifiedSchema, provider, opts)) :: Nil case c: CreateTableUsing if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case c: CreateTableUsing if c.temporary && c.allowExisting => sys.error("allowExisting should be set to false when creating a temporary table.") - case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) => + case CreateTableUsingAsSelect(tableIdentifier, provider, true, mode, opts, query) => val cmd = - CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query) + CreateTempTableUsingAsSelect(tableIdentifier, provider, mode, opts, query) ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 78d494184e75..60f436d6f928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -83,10 +83,9 @@ private[sql] class DDLParser( * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = - // TODO: Support database.table. - (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ - tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { - case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ rep1sep(ident, ".") ~ + (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { + case temp ~ allowExisting ~ tableIdentifier ~ columns ~ provider ~ opts ~ query => if (temp.isDefined && allowExisting.isDefined) { throw new DDLException( "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") @@ -108,7 +107,7 @@ private[sql] class DDLParser( } val queryPlan = parseQuery(query.get) - CreateTableUsingAsSelect(tableName, + CreateTableUsingAsSelect(tableIdentifier, provider, temp.isDefined, mode, @@ -117,7 +116,7 @@ private[sql] class DDLParser( } else { val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) CreateTableUsing( - tableName, + tableIdentifier, userSpecifiedSchema, provider, temp.isDefined, @@ -286,7 +285,7 @@ private[sql] case class DescribeCommand( * If it is false, an exception will be thrown */ private[sql] case class CreateTableUsing( - tableName: String, + tableIdentifier: Seq[String], userSpecifiedSchema: Option[StructType], provider: String, temporary: Boolean, @@ -301,7 +300,7 @@ private[sql] case class CreateTableUsing( * So, [[PreWriteCheck]] can detect cases that are not allowed. */ private[sql] case class CreateTableUsingAsSelect( - tableName: String, + tableIdentifier: Seq[String], provider: String, temporary: Boolean, mode: SaveMode, @@ -312,8 +311,8 @@ private[sql] case class CreateTableUsingAsSelect( // override lazy val resolved = databaseName != None && childrenResolved } -private[sql] case class CreateTempTableUsing( - tableName: String, +private [sql] case class CreateTempTableUsing( + tableIdentifier: Seq[String], userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]) extends RunnableCommand { @@ -321,13 +320,13 @@ private[sql] case class CreateTempTableUsing( def run(sqlContext: SQLContext): Seq[Row] = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) sqlContext.registerDataFrameAsTable( - DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableIdentifier.mkString(".")) Seq.empty } } -private[sql] case class CreateTempTableUsingAsSelect( - tableName: String, +private [sql] case class CreateTempTableUsingAsSelect( + tableIdentifier: Seq[String], provider: String, mode: SaveMode, options: Map[String, String], @@ -337,7 +336,7 @@ private[sql] case class CreateTempTableUsingAsSelect( val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df) sqlContext.registerDataFrameAsTable( - DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableIdentifier.mkString(".")) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index 6ed68d179edc..643b874c5ba8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -107,12 +107,12 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) => + case CreateTableUsingAsSelect(tableIdentifier, _, _, SaveMode.Overwrite, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (catalog.tableExists(Seq(tableName))) { + if (catalog.tableExists(tableIdentifier)) { // Need to remove SubQuery operator. - EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { + EliminateSubQueries(catalog.lookupRelation(tableIdentifier)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). case l @ LogicalRelation(dest: BaseRelation) => @@ -122,7 +122,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } if (srcRelations.contains(dest)) { failAnalysis( - s"Cannot overwrite table $tableName that is also being read from.") + s"Cannot overwrite table ${tableIdentifier.mkString(".")}" + + s" that is also being read from.") } else { // OK } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c4a73b300407..ef1580c44ef8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -107,13 +107,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * call this function to invalidate the cache. */ def refreshTable(tableName: String): Unit = { - // TODO: Database support... - catalog.refreshTable("default", tableName) + val (dbName, tblName) = catalog.getDBAndTableName(tableName.split("\\.")) + catalog.refreshTable(dbName, tblName) } protected[hive] def invalidateTable(tableName: String): Unit = { - // TODO: Database support... - catalog.invalidateTable("default", tableName) + val (dbName, tblName) = catalog.getDBAndTableName(tableName.split("\\.")) + catalog.invalidateTable(dbName, tblName) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f1c0bd92aa23..50e7b6300a4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -56,19 +56,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) - // TODO: Use this everywhere instead of tuples or databaseName, tableName,. - /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) { - def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase) - } - /** A cache of Spark SQL data source tables that have been accessed. */ - protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { - val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { - override def load(in: QualifiedTableName): LogicalPlan = { + protected[hive] val cachedDataSourceTables: LoadingCache[Seq[String], LogicalPlan] = { + val cacheLoader = new CacheLoader[Seq[String], LogicalPlan]() { + override def load(in: Seq[String]): LogicalPlan = { logDebug(s"Creating new cached data source for $in") val table = HiveMetastoreCatalog.this.synchronized { - client.getTable(in.database, in.name) + client.getTable(in(0), in(1)) } def schemaStringFromParts: Option[String] = { @@ -128,22 +122,30 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } def invalidateTable(databaseName: String, tableName: String): Unit = { - cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) + cachedDataSourceTables.invalidate(Seq(databaseName, tableName).map(_.toLowerCase())) } val caseSensitive: Boolean = false + private[hive] def getDBAndTableName(tableIdentifier: Seq[String]): (String, String) = { + val tableIdent = processTableIdentifier(tableIdentifier) + val dbName = tableIdent.lift(tableIdent.size - 2).getOrElse( + hive.sessionState.getCurrentDatabase) + val tblName = tableIdent.last + (dbName, tblName) + } + /** * Creates a data source table (a table created with USING clause) in Hive's metastore. * Returns true when the table has been created. Otherwise, false. */ def createDataSourceTable( - tableName: String, + tableIdentifier: Seq[String], userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], isExternal: Boolean): Unit = { - val (dbName, tblName) = processDatabaseAndTableName("default", tableName) + val (dbName, tblName) = getDBAndTableName(tableIdentifier) val tbl = new Table(dbName, tblName) tbl.setProperty("spark.sql.sources.provider", provider) @@ -174,18 +176,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } def hiveDefaultTableFilePath(tableName: String): String = synchronized { - val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase) + val (databaseName, tblName) = getDBAndTableName(tableName.split("\\.")) + val db = client.getDatabase(databaseName) - hiveWarehouse.getTablePath(currentDatabase, tableName).toString + hiveWarehouse.getTablePath(db, tblName).toString } def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = - tableIdent - .lift(tableIdent.size - 2) - .getOrElse(hive.sessionState.getCurrentDatabase) - val tblName = tableIdent.last + val (databaseName, tblName) = getDBAndTableName(tableIdentifier) client.getTable(databaseName, tblName, false) != null } @@ -193,10 +191,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tableIdentifier: Seq[String], alias: Option[String]): LogicalPlan = { val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - hive.sessionState.getCurrentDatabase) - val tblName = tableIdent.last - val table = try { + + val (databaseName, tblName) = getDBAndTableName(tableIdentifier) + val table = try { synchronized { client.getTable(databaseName, tblName) } @@ -207,7 +204,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with if (table.getProperty("spark.sql.sources.provider") != null) { val dataSourceTable = - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + cachedDataSourceTables(Seq(databaseName, tblName).map(_.toLowerCase)) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Othersie, wrap the table with a Subquery using the table name. val withAlias = @@ -245,10 +242,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + Seq(metastoreRelation.databaseName, metastoreRelation.tableName) def getCached( - tableIdentifier: QualifiedTableName, + tableIdentifier: Seq[String], pathsInMetastore: Seq[String], schemaInMetastore: StructType, partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { @@ -331,8 +328,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** * Create table with specified database, table name, table description and schema - * @param databaseName Database Name - * @param tableName Table Name + * @param tableIdentifier A fully qualified identifier for a table (i.e., database.tableName) * @param schema Schema of the new table, if not specified, will use the schema * specified in crtTbl * @param allowExisting if true, ignore AlreadyExistsException @@ -340,14 +336,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * we support most of the features except the bucket. */ def createTable( - databaseName: String, - tableName: String, + tableIdentifier: Seq[String], schema: Seq[Attribute], allowExisting: Boolean = false, desc: Option[CreateTableDesc] = None) { val hconf = hive.hiveconf - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) + val (dbName, tblName) = getDBAndTableName(tableIdentifier) val tbl = new Table(dbName, tblName) val crtTbl: CreateTableDesc = desc.getOrElse(null) @@ -602,6 +597,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Need to think about how to implement the CreateTableAsSelect.resolved case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) + val tableIdentifier = if(dbName.nonEmpty) { + Seq(dbName.get, tblName) + } else { + Seq(tblName) + } val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) // Get the CreateTableDesc from Hive SemanticAnalyzer @@ -641,7 +641,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( - tblName, + tableIdentifier, hive.conf.defaultDataSourceName, temporary = false, mode, @@ -661,6 +661,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) + val tableIdentifier = if(dbName.nonEmpty) { + Seq(dbName.get, tblName) + } else { + Seq(tblName) + } if (hive.convertCTAS) { if (dbName.isDefined) { throw new AnalysisException( @@ -670,7 +675,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( - tblName, + tableIdentifier, hive.conf.defaultDataSourceName, temporary = false, mode, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 85061f22772d..5342ef7ce423 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -863,7 +863,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val tableIdent = tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => Seq(tableOnly) + case Seq(tableName) => tableName.split("\\.").toSeq case Seq(databaseName, table) => Seq(databaseName, table) case other => sys.error("Hive only supports tables names like 'tableName' " + s"or 'databaseName.tableName', found '$other'") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a6f4fbe8aba0..4a736f0c7f8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -221,14 +221,25 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case CreateTableUsing( - tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => + tableIdentifier, + userSpecifiedSchema, + provider, + false, + opts, + allowExisting, + managedIfNoPath) => ExecutedCommand( CreateMetastoreDataSource( - tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil + tableIdentifier, + userSpecifiedSchema, + provider, + opts, + allowExisting, + managedIfNoPath)) :: Nil - case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) => + case CreateTableUsingAsSelect(tableIdentifier, provider, false, mode, opts, query) => val cmd = - CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query) + CreateMetastoreDataSourceAsSelect(tableIdentifier, provider, mode, opts, query) ExecutedCommand(cmd) :: Nil case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 76a1965f3cb2..e44cf3bc69da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -49,7 +49,7 @@ case class CreateTableAsSelect( val hiveContext = sqlContext.asInstanceOf[HiveContext] lazy val metastoreRelation: MetastoreRelation = { // Create Hive Table - hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) + hiveContext.catalog.createTable(Seq(database, tableName), query.output, allowExisting, desc) // Get the Metastore Relation hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index a40a1e53117c..cb52b837ff84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -68,7 +68,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(Seq(tableName)) + hiveContext.catalog.unregisterTable(tableName.split("\\.")) Seq.empty[Row] } } @@ -103,7 +103,7 @@ case class AddFile(path: String) extends RunnableCommand { private[hive] case class CreateMetastoreDataSource( - tableName: String, + tableIdentifier: Seq[String], userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], @@ -112,12 +112,13 @@ case class CreateMetastoreDataSource( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] + val tableIdents = tableIdentifier.mkString(".") - if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table $tableName already exists.") + throw new AnalysisException(s"Table $tableIdents already exists.") } } @@ -125,13 +126,14 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + options + + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdents)) } else { options } hiveContext.catalog.createDataSourceTable( - tableName, + tableIdentifier, userSpecifiedSchema, provider, optionsWithPath, @@ -143,7 +145,7 @@ case class CreateMetastoreDataSource( private[hive] case class CreateMetastoreDataSourceAsSelect( - tableName: String, + tableIdentifier: Seq[String], provider: String, mode: SaveMode, options: Map[String, String], @@ -151,22 +153,24 @@ case class CreateMetastoreDataSourceAsSelect( override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] + val tableName = tableIdentifier.mkString(".") var createMetastoreTable = false var isExternal = true val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + options + + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) } else { options } var existingSchema = None: Option[StructType] - if (sqlContext.catalog.tableExists(Seq(tableName))) { + if (sqlContext.catalog.tableExists(tableIdentifier)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => - throw new AnalysisException(s"Table $tableName already exists. " + + throw new AnalysisException(s"Table ${tableIdentifier.mkString(".")} already exists. " + s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + s"the existing data. " + @@ -183,9 +187,11 @@ case class CreateMetastoreDataSourceAsSelect( case l @ LogicalRelation(i: InsertableRelation) => if (i != createdRelation.relation) { val errorDescription = - s"Cannot append to table $tableName because the resolved relation does not " + - s"match the existing relation of $tableName. " + - s"You can use insertInto($tableName, false) to append this DataFrame to the " + + s"Cannot append to table ${tableIdentifier.mkString(".")} " + + s"because the resolved relation does not " + + s"match the existing relation of ${tableIdentifier.mkString(".")}. " + + s"You can use insertInto(${tableIdentifier.mkString(".")}, false) " + + s"to append this DataFrame to the " + s"table $tableName and using its data source and options." val errorMessage = s""" @@ -204,7 +210,7 @@ case class CreateMetastoreDataSourceAsSelect( throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - hiveContext.sql(s"DROP TABLE IF EXISTS $tableName") + hiveContext.sql(s"DROP TABLE IF EXISTS ${tableIdentifier.mkString(".")}") // Need to create the table again. createMetastoreTable = true } @@ -228,7 +234,7 @@ case class CreateMetastoreDataSourceAsSelect( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). hiveContext.catalog.createDataSourceTable( - tableName, + tableIdentifier, Some(resolved.relation.schema), provider, optionsWithPath, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e09c702c8969..9e2fff9e0da8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -66,6 +66,88 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { jsonFile(filePath).collect().toSeq) } + test ("support db.table") { + // create a new database for test + sql("create database db1") + + // table name: A.B + sql( + s""" + |CREATE TABLE db1.jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM db1.jsonTable"), + jsonFile(filePath).collect().toSeq) + + sql("DROP TABLE db1.jsonTable") + + // default database name is `default` + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM default.jsonTable"), + jsonFile(filePath).collect().toSeq) + + sql("DROP TABLE default.jsonTable") + + // table name: A.B.C... + sql( + s""" + |CREATE TABLE x.y.db1.jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM `x.y.db1.jsonTable`"), + jsonFile(filePath).collect().toSeq) + + sql("DROP TABLE db1.jsonTable") + + // the specified database not exists, throw HiveException + intercept[org.apache.hadoop.hive.ql.metadata.HiveException] { + sql( + s""" + |CREATE TABLE db2.jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + } + + // df save table name: A.B + val originalDefaultSource = conf.defaultDataSourceName + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") + // Save the df as a managed table (by not specifying the path). + df.saveAsTable("db1.savedJsonTable") + + checkAnswer( + sql("SELECT * FROM db1.savedJsonTable"), + df.collect()) + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + sql("DROP TABLE db1.savedJsonTable") + sql("DROP DATABASE db1") + } + test ("persistent JSON table with a user specified schema") { sql( s""" @@ -672,7 +754,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}") // Manually create a metastore data source table. catalog.createDataSourceTable( - tableName = "wide_schema", + tableIdentifier = Seq("wide_schema"), userSpecifiedSchema = Some(schema), provider = "json", options = Map("path" -> "just a dummy path"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index d5dd0bf58e70..d74b82557ee1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -393,7 +393,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = { + def checkCached(tableIdentifer: Seq[String]): Unit = { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { case null => fail("Converted test_parquet should be cached in the cache.") @@ -421,7 +421,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifer = Seq("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) @@ -461,7 +461,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + tableIdentifer = Seq("default", "test_parquet_partitioned_cache_test") assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) sql( """