Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,9 @@ abstract class HadoopFsRelation private[sql](
val jobConf = new JobConf(hadoopConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
fs.listStatus(qualified, pathFilter)
} else {
Try(fs.listStatus(qualified)).getOrElse(Array.empty)
fs.listStatus(qualified)
}
}.filterNot { status =>
val name = status.getPath.getName
Expand Down Expand Up @@ -903,7 +903,7 @@ private[sql] object HadoopFsRelation extends Logging {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(serializableConfiguration.value)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty)
listLeafFiles(fs, fs.getFileStatus(qualified))
}.map { status =>
FakeFileStatus(
status.getPath.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
val e3 = intercept[AnalysisException] {
sql("select * from json.invalid_file")
}
assert(e3.message.contains("No input paths specified"))
assert(e3.message.contains("invalid_file does not exist"))
}

test("SortMergeJoin returns wrong results when using UnsafeRows") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,22 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
serdeProperties = options)
}

def hasPartitionColumns(relation: HadoopFsRelation): Boolean = {
try {
// HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in
// file listing" https://github.com/apache/spark/pull/14139
// Calling hadoopFsRelation.partitionColumns will trigger the refresh call of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add to the comment that this is a hack for [SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in file listing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// the HadoopFsRelation, which will validate input paths. However, when we create
// an empty table, the dir of the table has not been created, which will
// cause a FileNotFoundException. So, at here we will catch the FileNotFoundException
// and return false.
relation.partitionColumns.nonEmpty
} catch {
case _: java.io.FileNotFoundException =>
false
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is equivalent with val resolvedRelation = dataSource.resolveRelation(checkPathExist = false) in 2.0 (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L427).


def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
schema.map { field =>
Expand All @@ -284,12 +300,18 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}

assert(partitionColumns.isEmpty)
assert(relation.partitionColumns.isEmpty)
assert(!hasPartitionColumns(relation))

HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
schema = schemaToHiveColumn(relation.schema),
// HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in
// file listing" https://github.com/apache/spark/pull/14139
// Since the table is not partitioned, we use dataSchema instead of using schema.
// Using schema which will trigger partition discovery on the path that
// may not be created causing FileNotFoundException. So, we just get dataSchema
// instead of calling relation.schema.
schema = schemaToHiveColumn(relation.dataSchema),
partitionColumns = Nil,
tableType = tableType,
properties = tableProperties.toMap,
Expand All @@ -312,14 +334,14 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
(None, message)

case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
if relation.paths.length == 1 && !hasPartitionColumns(relation) =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message =
s"Persisting data source relation $qualifiedTableName with a single input path " +
s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
(Some(hiveTable), message)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
case (Some(serde), relation: HadoopFsRelation) if hasPartitionColumns(relation) =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -696,19 +696,34 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("a table with an invalid path can be still dropped") {
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
val tableIdent = TableIdentifier("test_drop_table_with_invalid_path")
catalog.createDataSourceTable(
tableIdent = tableIdent,
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
provider = "json",
options = Map("path" -> "an invalid path"),
isExternal = false)

sql("DROP TABLE test_drop_table_with_invalid_path")
}

test("SPARK-6024 wide schema support") {
withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
withTable("wide_schema") {
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))

val tableIdent = TableIdentifier("wide_schema")
val path = catalog.hiveDefaultTableFilePath(tableIdent)
// Manually create a metastore data source table.
catalog.createDataSourceTable(
tableIdent = TableIdentifier("wide_schema"),
tableIdent = tableIdent,
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
provider = "json",
options = Map("path" -> "just a dummy path"),
options = Map("path" -> path),
isExternal = false)

invalidateTable("wide_schema")
Expand Down