From 14811636692810809033bc7caf03fcecb6939aa3 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 11 Dec 2017 20:12:45 +0800 Subject: [PATCH 1/3] Get rid of dataSource.writeAndRead --- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../command/InsertIntoDataSourceDirCommand.scala | 5 +++-- .../execution/command/createDataSourceTables.scala | 13 ++++++++++--- .../sql/execution/datasources/DataSource.scala | 12 +++++------- .../apache/spark/sql/sources/PathOptionSuite.scala | 13 +++++++++---- .../spark/sql/util/DataFrameCallbackSuite.scala | 10 ++++++---- 6 files changed, 34 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 59a01e61124f..818c373f6a28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -628,7 +628,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ - private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { + def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { val qe = session.sessionState.executePlan(command) try { val start = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 9e3519073303..eec00d15cd44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -67,8 +67,9 @@ case class InsertIntoDataSourceDirCommand( val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists try { - sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) - dataSource.writeAndRead(saveMode, query) + Dataset.ofRows(sparkSession, query).write + .runCommand(sparkSession, "insertIntoDataSourceDir")( + dataSource.planForWriting(saveMode, query)) } catch { case ex: AnalysisException => logError(s"Failed to write to directory " + storage.locationUri.toString, ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 306f43dc4214..39a335784885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -173,7 +173,8 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) + sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false).get + val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of @@ -200,7 +201,7 @@ case class CreateDataSourceTableAsSelectCommand( tableLocation: Option[URI], data: LogicalPlan, mode: SaveMode, - tableExists: Boolean): BaseRelation = { + tableExists: Boolean): Option[BaseRelation] = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) val dataSource = DataSource( @@ -212,7 +213,13 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.writeAndRead(mode, query) + Dataset.ofRows(session, query).write + .runCommand(session, "saveDataIntoTable")(dataSource.planForWriting(mode, query)) + if (!tableExists) { + Some(dataSource.getRelation(mode, query)) + } else { + None + } } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b676672b38cd..8fa647d428b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -490,20 +489,19 @@ case class DataSource( } /** - * Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for - * the following reading. + * Returns a [[BaseRelation]] for creating table after `planForWriting`. Only use + * in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table. */ - def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = { + def getRelation(mode: SaveMode, data: LogicalPlan): BaseRelation = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } providingClass.newInstance() match { - case dataSource: CreatableRelationProvider => + case dataSource: RelationProvider => dataSource.createRelation( - sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) + sparkSession.sqlContext, caseInsensitiveOptions) case format: FileFormat => - sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index 85da3f0e3846..e44d06117915 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.sources import java.net.URI -import org.apache.hadoop.fs.Path - import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType} -class TestOptionsSource extends SchemaRelationProvider with CreatableRelationProvider { +class TestOptionsSource extends SchemaRelationProvider + with CreatableRelationProvider with RelationProvider { // This is used in the read path. override def createRelation( @@ -46,6 +44,13 @@ class TestOptionsSource extends SchemaRelationProvider with CreatableRelationPro data: DataFrame): BaseRelation = { new TestOptionsRelation(parameters)(sqlContext.sparkSession) } + + // This is used in the write path while table not exist in CTAS scenario. + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + new TestOptionsRelation(parameters)(sqlContext.sparkSession) + } } class TestOptionsRelation(val options: Map[String, String])(@transient val session: SparkSession) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index a239e39d9c5a..5918ee5d3cf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -197,10 +197,12 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { withTable("tab") { spark.range(10).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("tab") - assert(commands.length == 5) - assert(commands(4)._1 == "saveAsTable") - assert(commands(4)._2.isInstanceOf[CreateTable]) - assert(commands(4)._2.asInstanceOf[CreateTable].tableDesc.partitionColumnNames == Seq("p")) + assert(commands.length == 6) + assert(commands(4)._1 == "saveDataIntoTable") + assert(commands(4)._2.isInstanceOf[InsertIntoHadoopFsRelationCommand]) + assert(commands(5)._1 == "saveAsTable") + assert(commands(5)._2.isInstanceOf[CreateTable]) + assert(commands(5)._2.asInstanceOf[CreateTable].tableDesc.partitionColumnNames == Seq("p")) } withTable("tab") { From 41a2c151c11a0d690626cae2a39623e712aae764 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 14 Dec 2017 22:40:46 +0800 Subject: [PATCH 2/3] Address comments --- .../command/InsertIntoDataSourceDirCommand.scala | 4 +--- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../org/apache/spark/sql/sources/PathOptionSuite.scala | 10 +--------- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index eec00d15cd44..1dc24b3d221c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -67,9 +67,7 @@ case class InsertIntoDataSourceDirCommand( val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists try { - Dataset.ofRows(sparkSession, query).write - .runCommand(sparkSession, "insertIntoDataSourceDir")( - dataSource.planForWriting(saveMode, query)) + sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)).toRdd } catch { case ex: AnalysisException => logError(s"Failed to write to directory " + storage.locationUri.toString, ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 8fa647d428b0..bbd7ed673448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -498,9 +498,9 @@ case class DataSource( } providingClass.newInstance() match { - case dataSource: RelationProvider => + case dataSource: CreatableRelationProvider => dataSource.createRelation( - sparkSession.sqlContext, caseInsensitiveOptions) + sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) case format: FileFormat => // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index e44d06117915..d5604c94b885 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType} -class TestOptionsSource extends SchemaRelationProvider - with CreatableRelationProvider with RelationProvider { +class TestOptionsSource extends SchemaRelationProvider with CreatableRelationProvider { // This is used in the read path. override def createRelation( @@ -44,13 +43,6 @@ class TestOptionsSource extends SchemaRelationProvider data: DataFrame): BaseRelation = { new TestOptionsRelation(parameters)(sqlContext.sparkSession) } - - // This is used in the write path while table not exist in CTAS scenario. - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { - new TestOptionsRelation(parameters)(sqlContext.sparkSession) - } } class TestOptionsRelation(val options: Map[String, String])(@transient val session: SparkSession) From 5221c7cf11dc0accfcd1205177d0332bca042ffc Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 15 Dec 2017 11:47:06 +0800 Subject: [PATCH 3/3] Revert the changes on DataFrameWriter API --- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../execution/command/createDataSourceTables.scala | 13 +++---------- .../sql/execution/datasources/DataSource.scala | 8 +++++--- .../apache/spark/sql/sources/PathOptionSuite.scala | 3 +++ .../spark/sql/util/DataFrameCallbackSuite.scala | 10 ++++------ 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 818c373f6a28..59a01e61124f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -628,7 +628,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ - def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { + private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { val qe = session.sessionState.executePlan(command) try { val start = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 39a335784885..306f43dc4214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -173,8 +173,7 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false).get - + sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of @@ -201,7 +200,7 @@ case class CreateDataSourceTableAsSelectCommand( tableLocation: Option[URI], data: LogicalPlan, mode: SaveMode, - tableExists: Boolean): Option[BaseRelation] = { + tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) val dataSource = DataSource( @@ -213,13 +212,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - Dataset.ofRows(session, query).write - .runCommand(session, "saveDataIntoTable")(dataSource.planForWriting(mode, query)) - if (!tableExists) { - Some(dataSource.getRelation(mode, query)) - } else { - None - } + dataSource.writeAndRead(mode, query) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index bbd7ed673448..b676672b38cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -489,10 +490,10 @@ case class DataSource( } /** - * Returns a [[BaseRelation]] for creating table after `planForWriting`. Only use - * in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table. + * Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for + * the following reading. */ - def getRelation(mode: SaveMode, data: LogicalPlan): BaseRelation = { + def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } @@ -502,6 +503,7 @@ case class DataSource( dataSource.createRelation( sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data)) case format: FileFormat => + sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index d5604c94b885..85da3f0e3846 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.sources import java.net.URI +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 5918ee5d3cf1..a239e39d9c5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -197,12 +197,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { withTable("tab") { spark.range(10).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("tab") - assert(commands.length == 6) - assert(commands(4)._1 == "saveDataIntoTable") - assert(commands(4)._2.isInstanceOf[InsertIntoHadoopFsRelationCommand]) - assert(commands(5)._1 == "saveAsTable") - assert(commands(5)._2.isInstanceOf[CreateTable]) - assert(commands(5)._2.asInstanceOf[CreateTable].tableDesc.partitionColumnNames == Seq("p")) + assert(commands.length == 5) + assert(commands(4)._1 == "saveAsTable") + assert(commands(4)._2.isInstanceOf[CreateTable]) + assert(commands(4)._2.asInstanceOf[CreateTable].tableDesc.partitionColumnNames == Seq("p")) } withTable("tab") {