Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ statement
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
| createTableHeader
LIKE sourceFormat=STRING sourceLocation=STRING
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)? #createTableLikeFile
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
| ALTER TABLE tableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
Expand All @@ -33,6 +37,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -1195,6 +1200,101 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

/**
* Create a Hive serde table, returning a [[CreateTable]] logical plan.
*
* Expect Format:
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name].[table_name]
* LIKE 'parquet' 'parquert-file-location'
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTableLikeFile(ctx: CreateTableLikeFileContext): LogicalPlan =
withOrigin(ctx) {

val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
// TODO: implement temporary tables
if (temp) {
throw new ParseException(
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
}
if (ctx.skewSpec != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}

val sourceFileFormat = Option(ctx.sourceFormat.getText).get.toString.replace("'", "")
if (!sourceFileFormat.toLowerCase.equals("parquet")) {
operationNotAllowed("CREATE TABLE ... LIKE File only supports parquet", ctx)
}
val sourceFileLocation = Option(ctx.sourceLocation.getText).get.toString.replace("'", "")

val hadoopConf = new Configuration()
val metaData = ParquetFileReader.readFooter(hadoopConf, new Path(sourceFileLocation.toString))
val parquetSchema = metaData.getFileMetaData.getSchema

val dataCols: StructType = ParquetSchemaConverter.
SqlParquetSchemaConverter.convert(parquetSchema)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).
getOrElse(Map.empty)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)

// Storage format
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
}

val locUri = location.map(CatalogUtils.stringToURI(_))
val storage = CatalogStorageFormat(
locationUri = locUri,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
compressed = false,
properties = rowStorage.properties ++ fileStorage.properties)
// If location is defined, we'll assume this is an external table.
// Otherwise, we may accidentally delete existing data.
val tableType = if (external || location.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}

// TODO support the sql text - have a proper location for this!
val tableDesc = CatalogTable(
identifier = name,
tableType = tableType,
storage = storage,
schema = schema,
bucketSpec = bucketSpec,
provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
comment = Option(ctx.comment).map(string))

val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
}

/**
* Create a [[CreateTableLikeCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,4 +600,6 @@ private[sql] object ParquetSchemaConverter {
Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes
.asInstanceOf[Int]
}
}

def SqlParquetSchemaConverter: ParquetSchemaConverter = new ParquetSchemaConverter()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.sql.execution.command

import java.io.File
import java.net.URI
import java.util.Locale

import scala.reflect.{classTag, ClassTag}

import scala.reflect.{ClassTag, classTag}
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
Expand Down Expand Up @@ -1631,6 +1631,38 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
comparePlans(parsed, expected)
}

test("create table like parquet") {

val f = getClass.getClassLoader.
getResource("test-data/dec-in-fixed-len.parquet").getPath
val v1 =
"""
|create table if not exists db1.table1 like 'parquet'
""".stripMargin.concat("'" + f + "'").concat(
"""
|stored as sequencefile
|location '/tmp/table1'
""".stripMargin
)

val (desc, allowExisting) = extractTableDesc(v1)

assert(allowExisting)
assert(desc.identifier.database == Some("db1"))
assert(desc.identifier.table == "table1")
assert(desc.tableType == CatalogTableType.EXTERNAL)
assert(desc.schema == new StructType()
.add("fixed_len_dec", "decimal(10,2)"))
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewDefaultDatabase.isEmpty)
assert(desc.viewQueryColumnNames.isEmpty)
assert(desc.storage.locationUri == Some(new URI("/tmp/table1")))
assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.SequenceFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
}

test("create table like") {
val v1 = "CREATE TABLE table1 LIKE table2"
val (target, source, location, exists) = parser.parsePlan(v1).collect {
Expand Down