Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ case class CatalogTable(
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
serdeProperties: Map[String, String] = storage.properties): CatalogTable = {
properties: Map[String, String] = storage.properties): CatalogTable = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have renamed CatalogStorageFormat.serdeProperties to properties, this should also be updated.

copy(storage = CatalogStorageFormat(
locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
locationUri, inputFormat, outputFormat, serde, compressed, properties))
}

override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
Expand Down Expand Up @@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}

val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.properties
}

// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val dataSource: BaseRelation =
Expand All @@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = optionsWithPath).resolveRelation(checkPathExist = false)
options = table.storage.properties).resolveRelation()

dataSource match {
case fs: HadoopFsRelation =>
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
throw new AnalysisException(
"Cannot create a file-based external data source table without path")
}
case _ =>
}

val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
Expand All @@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}

val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.properties
}

val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand(
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
Expand Down Expand Up @@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand(
if (DDLUtils.isDatasourceTable(table)) {
table.withNewStorage(
locationUri = Some(location),
serdeProperties = table.storage.properties ++ Map("path" -> location))
properties = table.storage.properties ++ Map("path" -> location))
} else {
table.withNewStorage(locationUri = Some(location))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ case class AlterTableRenameCommand(
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newTable = table.withNewStorage(
serdeProperties = table.storage.properties ++ Map("path" -> newPath))
properties = table.storage.properties ++ Map("path" -> newPath))
catalog.alterTable(newTable)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,8 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkPathExist A flag to indicate whether to check the existence of path or not.
* This flag will be set to false when we create an empty table (the
* path of the table does not exist).
*/
def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
def resolveRelation(): BaseRelation = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with Wenchen, it is not safe to skip calling resolveRelation() when it is a managed table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if it is a JDBC Relation provider, we will call
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) to some extra check

  def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
    val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
      // TODO: Throw when too much is given.
      case (dataSource: SchemaRelationProvider, Some(schema)) =>
        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
      case (dataSource: RelationProvider, None) =>
        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)

Copy link
Member

@gatorsmile gatorsmile Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clockfly Sorry, I did not get your point. What you said above is only for the read path, right? The changes we did here is for the write path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, today, I just updated the write path for JDBC connection. #14077

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I means write path.

When createRelation() is called on a RelationProvider, RelationProvider may do some extra check to make sure the options provided are valid. We'd better enforce the check when trying to create a managed table.

For example, JdbcRelationProvider will validate the options

class JdbcRelationProvider extends RelationProvider with DataSourceRegister {

  override def shortName(): String = "jdbc"

  /** Returns a new base relation with the given parameters. */
  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val jdbcOptions = new JDBCOptions(parameters)
    if (jdbcOptions.partitionColumn != null
      && (jdbcOptions.lowerBound == null
        || jdbcOptions.upperBound == null
        || jdbcOptions.numPartitions == null)) {
      sys.error("Partitioning incompletely specified")
    }

    val partitionInfo = if (jdbcOptions.partitionColumn == null) {
      null
    } else {
      JDBCPartitioningInfo(
        jdbcOptions.partitionColumn,
        jdbcOptions.lowerBound.toLong,
        jdbcOptions.upperBound.toLong,
        jdbcOptions.numPartitions.toInt)
    }
    val parts = JDBCRelation.columnPartition(partitionInfo)
    val properties = new Properties() // Additional properties that we will pass to getConnection
    parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
    JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
  }
}

Copy link
Contributor Author

@cloud-fan cloud-fan Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I said before is wrong, managed table still need to call resolveRelation to do some validation, because the data source may not be file-based but something else. From the code:

 def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
    val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
      // TODO: Throw when too much is given.
      case (dataSource: SchemaRelationProvider, Some(schema)) =>
        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
      case (dataSource: RelationProvider, None) =>
        dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
...

dataSource.createRelation may do some custom checking and we can't assume it's useless for managed table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a data source wants to implement a write path (save API), they need to extend the trait CreatableRelationProvider. That is what my PR #14077 does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, resolveRelation is not invoked by the write path of the non-file based data sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a discussion with Wenchen, resolveRelation will be invoked by CREATE TABLE ... USING..., although the write path in DataFrameWriterAPIs does not invoke it. Thanks! @clockfly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify it, RelationProvider is not only for read path.

val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
Expand Down Expand Up @@ -367,11 +363,11 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)

if (checkPathExist && globPath.isEmpty) {
if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
if (checkPathExist && !fs.exists(globPath.head)) {
if (!fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
Expand All @@ -391,7 +387,7 @@ case class DataSource(

val fileCatalog =
new ListingFileCatalog(
sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
sparkSession, globbedPaths, options, partitionSchema)

val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.FileNotFoundException

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
Expand All @@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType
* @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
* @param ignoreFileNotFound if true, return empty file list when encountering a
* [[FileNotFoundException]] in file listing. Note that this is a hack
* for SPARK-16313. We should get rid of this flag in the future.
*/
class ListingFileCatalog(
sparkSession: SparkSession,
override val paths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
ignoreFileNotFound: Boolean = false)
partitionSchema: Option[StructType])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha this too! @yhuai are there any more places we can clean up?

extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
Expand Down Expand Up @@ -88,7 +81,7 @@ class ListingFileCatalog(
*/
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
// Right now, the number of paths is less than the value of
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
Expand All @@ -104,12 +97,7 @@ class ListingFileCatalog(
logTrace(s"Listing $path on driver")

val childStatuses = {
val stats =
try {
fs.listStatus(path)
} catch {
case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
}
val stats = fs.listStatus(path)
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
sparkSession: SparkSession,
ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")

Expand All @@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging {
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
try {
listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
} catch {
case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
}
listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
}
}.map { status =>
val blockLocations = status match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}


/**
Expand Down Expand Up @@ -305,6 +306,22 @@ class CatalogSuite
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}

test("createExternalTable should fail if path is not given for file-based data source") {
val e = intercept[AnalysisException] {
spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
}
assert(e.message.contains("Unable to infer schema"))

val e2 = intercept[AnalysisException] {
spark.catalog.createExternalTable(
"tbl",
"json",
new StructType().add("i", IntegerType),
Map.empty[String, String])
}
assert(e2.message == "Cannot create a file-based external data source table without path")
}

// TODO: add tests for the rest of them

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
options = table.storage.properties)

LogicalRelation(
dataSource.resolveRelation(checkPathExist = true),
dataSource.resolveRelation(),
catalogTable = Some(table))
}
}
Expand Down