Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1556,8 +1556,6 @@ class DataFrame private[sql](
val files: Seq[String] = logicalPlan.collect {
case LogicalRelation(fsBasedRelation: HadoopFsRelation) =>
fsBasedRelation.paths.toSeq
case LogicalRelation(jsonRelation: JSONRelation) =>
jsonRelation.path.toSeq
}.flatten
files.toSet.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
def json(jsonRDD: RDD[String]): DataFrame = {
val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
sqlContext.baseRelationToDataFrame(
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
new JSONRelation(Some(jsonRDD), samplingRatio, userSpecifiedSchema, None, None)(sqlContext))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
t.refresh()
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray

logInfo {
Expand Down Expand Up @@ -87,6 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
t.refresh()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the last chance that to refresh the file status, otherwise, we may not able to reflect the latest files under the specified path, as we will pass the t.paths to create the rdd later on.

// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.IOException
import java.util.{Date, UUID}

import scala.collection.JavaConversions.asScalaIterator
Expand All @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{Utils, SerializableConfiguration}


private[sql] case class InsertIntoDataSource(
Expand Down Expand Up @@ -102,7 +103,12 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
fs.delete(qualifiedOutputPath, true)
Utils.tryOrIOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception if failed to delete the files, otherwise it will case failure in the unit test, which expect exception raised.

if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
throw new IOException(s"Unable to clear output " +
s"directory $qualifiedOutputPath prior to writing to it")
}
}
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
}

case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
case logical.InsertIntoTable(
LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) =>
// We need to make sure the partition columns specified by users do match partition
// columns of the relation.
val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
Expand All @@ -115,6 +116,17 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}

// Get all input data source relations of the query.
val srcRelations = query.collect {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can do that in another PR, as will check if the the overwritten table is the same as the source table for the HadoopFsRelation based data source.

This actually will break the json unit test if we don't change the code in this PR.

case LogicalRelation(src: BaseRelation) => src
}
if (srcRelations.contains(r)) {
failAnalysis(
"Cannot insert overwrite into table that is also being read from.")
} else {
// OK
}

case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
Expand Down
Loading