Skip to content

Commit c0ae6bc

Browse files
committed
[SPARK-17361][SQL] file-based external table without path should not be created
## What changes were proposed in this pull request? Using the public `Catalog` API, users can create a file-based data source table, without giving the path options. For this case, currently we can create the table successfully, but fail when we read it. Ideally we should fail during creation. This is because when we create data source table, we resolve the data source relation without validating path: `resolveRelation(checkPathExist = false)`. Looking back to why we add this trick(`checkPathExist`), it's because when we call `resolveRelation` for managed table, we add the path to data source options but the path is not created yet. So why we add this not-yet-created path to data source options? This PR fix the problem by adding path to options after we call `resolveRelation`. Then we can remove the `checkPathExist` parameter in `DataSource.resolveRelation` and do some related cleanups. ## How was this patch tested? existing tests and new test in `CatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #14921 from cloud-fan/check-path.
1 parent 64e826f commit c0ae6bc

File tree

9 files changed

+48
-44
lines changed

9 files changed

+48
-44
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ case class CatalogTable(
156156
outputFormat: Option[String] = storage.outputFormat,
157157
compressed: Boolean = false,
158158
serde: Option[String] = storage.serde,
159-
serdeProperties: Map[String, String] = storage.properties): CatalogTable = {
159+
properties: Map[String, String] = storage.properties): CatalogTable = {
160160
copy(storage = CatalogStorageFormat(
161-
locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
161+
locationUri, inputFormat, outputFormat, serde, compressed, properties))
162162
}
163163

164164
override def toString: String = {

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql._
21-
import org.apache.spark.sql.catalyst.TableIdentifier
2221
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2322
import org.apache.spark.sql.catalyst.catalog._
2423
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
5655
}
5756
}
5857

59-
val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
60-
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
61-
} else {
62-
table.storage.properties
63-
}
64-
6558
// Create the relation to validate the arguments before writing the metadata to the metastore,
6659
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
6760
val dataSource: BaseRelation =
@@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
7063
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
7164
className = table.provider.get,
7265
bucketSpec = table.bucketSpec,
73-
options = optionsWithPath).resolveRelation(checkPathExist = false)
66+
options = table.storage.properties).resolveRelation()
67+
68+
dataSource match {
69+
case fs: HadoopFsRelation =>
70+
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
71+
throw new AnalysisException(
72+
"Cannot create a file-based external data source table without path")
73+
}
74+
case _ =>
75+
}
7476

7577
val partitionColumnNames = if (table.schema.nonEmpty) {
7678
table.partitionColumnNames
@@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
8385
}
8486
}
8587

88+
val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
89+
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
90+
} else {
91+
table.storage.properties
92+
}
93+
8694
val newTable = table.copy(
8795
storage = table.storage.copy(properties = optionsWithPath),
8896
schema = dataSource.schema,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand(
318318
if (partSpec.isEmpty) {
319319
val newTable = table.withNewStorage(
320320
serde = serdeClassName.orElse(table.storage.serde),
321-
serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
321+
properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
322322
catalog.alterTable(newTable)
323323
} else {
324324
val spec = partSpec.get
@@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand(
669669
if (DDLUtils.isDatasourceTable(table)) {
670670
table.withNewStorage(
671671
locationUri = Some(location),
672-
serdeProperties = table.storage.properties ++ Map("path" -> location))
672+
properties = table.storage.properties ++ Map("path" -> location))
673673
} else {
674674
table.withNewStorage(locationUri = Some(location))
675675
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ case class AlterTableRenameCommand(
181181
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
182182
val newPath = catalog.defaultTablePath(newTblName)
183183
val newTable = table.withNewStorage(
184-
serdeProperties = table.storage.properties ++ Map("path" -> newPath))
184+
properties = table.storage.properties ++ Map("path" -> newPath))
185185
catalog.alterTable(newTable)
186186
}
187187
// Invalidate the table last, otherwise uncaching the table would load the logical plan

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,8 @@ case class DataSource(
315315
/**
316316
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
317317
* [[DataSource]]
318-
*
319-
* @param checkPathExist A flag to indicate whether to check the existence of path or not.
320-
* This flag will be set to false when we create an empty table (the
321-
* path of the table does not exist).
322318
*/
323-
def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
319+
def resolveRelation(): BaseRelation = {
324320
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
325321
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
326322
// TODO: Throw when too much is given.
@@ -367,11 +363,11 @@ case class DataSource(
367363
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
368364
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
369365

370-
if (checkPathExist && globPath.isEmpty) {
366+
if (globPath.isEmpty) {
371367
throw new AnalysisException(s"Path does not exist: $qualified")
372368
}
373369
// Sufficient to check head of the globPath seq for non-glob scenario
374-
if (checkPathExist && !fs.exists(globPath.head)) {
370+
if (!fs.exists(globPath.head)) {
375371
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
376372
}
377373
globPath
@@ -391,7 +387,7 @@ case class DataSource(
391387

392388
val fileCatalog =
393389
new ListingFileCatalog(
394-
sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
390+
sparkSession, globbedPaths, options, partitionSchema)
395391

396392
val dataSchema = userSpecifiedSchema.map { schema =>
397393
val equality = sparkSession.sessionState.conf.resolver

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.io.FileNotFoundException
21-
2220
import scala.collection.mutable
23-
import scala.util.Try
2421

2522
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
2623
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType
3734
* @param paths a list of paths to scan
3835
* @param partitionSchema an optional partition schema that will be use to provide types for the
3936
* discovered partitions
40-
* @param ignoreFileNotFound if true, return empty file list when encountering a
41-
* [[FileNotFoundException]] in file listing. Note that this is a hack
42-
* for SPARK-16313. We should get rid of this flag in the future.
4337
*/
4438
class ListingFileCatalog(
4539
sparkSession: SparkSession,
4640
override val paths: Seq[Path],
4741
parameters: Map[String, String],
48-
partitionSchema: Option[StructType],
49-
ignoreFileNotFound: Boolean = false)
42+
partitionSchema: Option[StructType])
5043
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
5144

5245
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -88,7 +81,7 @@ class ListingFileCatalog(
8881
*/
8982
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
9083
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
91-
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
84+
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
9285
} else {
9386
// Right now, the number of paths is less than the value of
9487
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
@@ -104,12 +97,7 @@ class ListingFileCatalog(
10497
logTrace(s"Listing $path on driver")
10598

10699
val childStatuses = {
107-
val stats =
108-
try {
109-
fs.listStatus(path)
110-
} catch {
111-
case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
112-
}
100+
val stats = fs.listStatus(path)
113101
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
114102
}
115103

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging {
440440
def listLeafFilesInParallel(
441441
paths: Seq[Path],
442442
hadoopConf: Configuration,
443-
sparkSession: SparkSession,
444-
ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
443+
sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
445444
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
446445
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
447446

@@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging {
462461
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
463462
paths.map(new Path(_)).flatMap { path =>
464463
val fs = path.getFileSystem(serializableConfiguration.value)
465-
try {
466-
listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
467-
} catch {
468-
case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
469-
}
464+
listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
470465
}
471466
}.map { status =>
472467
val blockLocations = status match {

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
2727
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
2828
import org.apache.spark.sql.catalyst.plans.logical.Range
2929
import org.apache.spark.sql.test.SharedSQLContext
30+
import org.apache.spark.sql.types.{IntegerType, StructType}
3031

3132

3233
/**
@@ -305,6 +306,22 @@ class CatalogSuite
305306
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
306307
}
307308

309+
test("createExternalTable should fail if path is not given for file-based data source") {
310+
val e = intercept[AnalysisException] {
311+
spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
312+
}
313+
assert(e.message.contains("Unable to infer schema"))
314+
315+
val e2 = intercept[AnalysisException] {
316+
spark.catalog.createExternalTable(
317+
"tbl",
318+
"json",
319+
new StructType().add("i", IntegerType),
320+
Map.empty[String, String])
321+
}
322+
assert(e2.message == "Cannot create a file-based external data source table without path")
323+
}
324+
308325
// TODO: add tests for the rest of them
309326

310327
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
8181
options = table.storage.properties)
8282

8383
LogicalRelation(
84-
dataSource.resolveRelation(checkPathExist = true),
84+
dataSource.resolveRelation(),
8585
catalogTable = Some(table))
8686
}
8787
}

0 commit comments

Comments
 (0)