From cce10441125a7fcc334944ca10eed4c83b10c81c Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 15 Feb 2017 20:56:36 +0800 Subject: [PATCH 1/9] [SPARK-19583][SQL]CTAS for data source table with a created location should succeed --- .../command/createDataSourceTables.scala | 9 +- .../execution/datasources/DataSource.scala | 36 ++++++-- .../datasources/DataSourceStrategy.scala | 1 + .../InsertIntoHadoopFsRelationCommand.scala | 82 ++++++++----------- .../sql/hive/execution/HiveDDLSuite.scala | 24 ++++++ 5 files changed, 94 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5abd57947650..3a47ae1d5dda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -140,8 +140,8 @@ case class CreateDataSourceTableAsSelectCommand( return Seq.empty } - saveDataIntoTable( - sparkSession, table, table.storage.locationUri, query, mode, tableExists = true) + saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode, + overwrite = false, tableExists = true) } else { assert(table.schema.isEmpty) @@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, mode, tableExists = false) + sparkSession, table, tableLocation, query, mode, overwrite = true, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of @@ -178,6 +178,7 @@ case class CreateDataSourceTableAsSelectCommand( tableLocation: Option[String], data: LogicalPlan, mode: SaveMode, + overwrite: Boolean, tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> _) @@ -190,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.writeAndRead(mode, Dataset.ofRows(session, query)) + dataSource.writeAndRead(mode, Dataset.ofRows(session, query), Some(overwrite)) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d510581f90e6..db415689c6c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -406,21 +406,44 @@ case class DataSource( /** * Writes the given [[DataFrame]] out in this [[FileFormat]]. */ - private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { + private def writeInFileFormat( + format: FileFormat, + mode: SaveMode, + data: DataFrame, + overwrite: Option[Boolean]): Unit = { // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; // 3. It's OK that the output path doesn't exist yet; val allPaths = paths ++ caseInsensitiveOptions.get("path") - val outputPath = if (allPaths.length == 1) { + val (outputPath, pathExists) = if (allPaths.length == 1) { val path = new Path(allPaths.head) val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) + val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + (qualifiedOutputPath, fs.exists(qualifiedOutputPath)) } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + s"got: ${allPaths.mkString(", ")}") } + val isOverWrite = overwrite match { + case Some(ow) => ow + case _ => + if (pathExists) { + if (mode == SaveMode.ErrorIfExists) { + throw new AnalysisException(s"path $outputPath already exists.") + } + if (mode == SaveMode.Ignore) { + // Since the path already exists and the save mode is Ignore, we will just return. + return + } + + if (mode == SaveMode.Append) false + else if (mode == SaveMode.Overwrite) true + else throw new IllegalStateException(s"unsupported save mode $mode") + } else true + } + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) @@ -452,6 +475,7 @@ case class DataSource( options = options, query = data.logicalPlan, mode = mode, + isOverWrite, catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd @@ -461,7 +485,7 @@ case class DataSource( * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for * the following reading. */ - def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = { + def writeAndRead(mode: SaveMode, data: DataFrame, overwrite: Option[Boolean]): BaseRelation = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } @@ -470,7 +494,7 @@ case class DataSource( case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => - writeInFileFormat(format, mode, data) + writeInFileFormat(format, mode, data, overwrite) // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() case _ => @@ -490,7 +514,7 @@ case class DataSource( case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => - writeInFileFormat(format, mode, data) + writeInFileFormat(format, mode, data, None) case _ => sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d8a515828728..cfaadb0edadb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -201,6 +201,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { t.options, actualQuery, mode, + overwrite, table, Some(t.location)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 652bcc833193..996519ee683a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -47,6 +47,7 @@ case class InsertIntoHadoopFsRelationCommand( options: Map[String, String], query: LogicalPlan, mode: SaveMode, + overwrite: Boolean, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex]) extends RunnableCommand { @@ -97,61 +98,46 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, isAppend = isAppend) - val doInsertion = (mode, pathExists) match { - case (SaveMode.ErrorIfExists, true) => - throw new AnalysisException(s"path $qualifiedOutputPath already exists.") - case (SaveMode.Overwrite, true) => - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) - true - case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => - true - case (SaveMode.Ignore, exists) => - !exists - case (s, exists) => - throw new IllegalStateException(s"unsupported save mode $s ($exists)") + if (overwrite) { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) } - if (doInsertion) { - - // Callback for updating metastore partition metadata after the insertion job completes. - def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { - if (partitionsTrackedByCatalog) { - val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions - if (newPartitions.nonEmpty) { - AlterTableAddPartitionCommand( - catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), - ifNotExists = true).run(sparkSession) - } - if (mode == SaveMode.Overwrite) { - val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions - if (deletedPartitions.nonEmpty) { - AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.toSeq, - ifExists = true, purge = false, - retainData = true /* already deleted */).run(sparkSession) - } + // Callback for updating metastore partition metadata after the insertion job completes. + def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { + if (partitionsTrackedByCatalog) { + val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions + if (newPartitions.nonEmpty) { + AlterTableAddPartitionCommand( + catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), + ifNotExists = true).run(sparkSession) + } + if (mode == SaveMode.Overwrite) { + val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions + if (deletedPartitions.nonEmpty) { + AlterTableDropPartitionCommand( + catalogTable.get.identifier, deletedPartitions.toSeq, + ifExists = true, purge = false, + retainData = true /* already deleted */).run(sparkSession) } } } - - FileFormatWriter.write( - sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, - fileFormat = fileFormat, - committer = committer, - outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations), - hadoopConf = hadoopConf, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - refreshFunction = refreshPartitionsCallback, - options = options) - - fileIndex.foreach(_.refresh()) - } else { - logInfo("Skipping insertion into a relation that already exists.") } + FileFormatWriter.write( + sparkSession = sparkSession, + queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + fileFormat = fileFormat, + committer = committer, + outputSpec = FileFormatWriter.OutputSpec( + qualifiedOutputPath.toString, customPartitionLocations), + hadoopConf = hadoopConf, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + refreshFunction = refreshPartitionsCallback, + options = options) + + fileIndex.foreach(_.refresh()) + Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c04b9ee0f2cd..df62d38e5ced 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1587,4 +1587,28 @@ class HiveDDLSuite } } } + + test("CTAS for data source table with a created location") { + withTable("t") { + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + + val partDir = new File(dir.getAbsolutePath + "/a=3") + assert(partDir.exists()) + + checkAnswer(spark.table("t"), Row(1, 2, 3, 4)) + } + } + } } From 058865bdac9895c1f810be2dcc3439f2a7d17b70 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 15 Feb 2017 21:12:13 +0800 Subject: [PATCH 2/9] fix a code style --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index df62d38e5ced..715aa262cc1e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1599,7 +1599,7 @@ class HiveDDLSuite |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d - """.stripMargin) + """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) From 8559e4e8f9b8e8f773f4d336866a01ff15c9fc5e Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 13:55:43 +0800 Subject: [PATCH 3/9] add test in DDLSuit --- .../execution/datasources/DataSource.scala | 4 +++- .../sql/execution/command/DDLSuite.scala | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index db415689c6c6..3a20dd617c1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -440,7 +440,9 @@ case class DataSource( if (mode == SaveMode.Append) false else if (mode == SaveMode.Overwrite) true - else throw new IllegalStateException(s"unsupported save mode $mode") + else { + throw new IllegalStateException(s"unsupported save mode $mode") + } } else true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 278d24725087..d8f5fbb1bc15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1832,4 +1832,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("CTAS for data source table with a created location") { + withTable("t") { + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + + val partDir = new File(dir.getAbsolutePath + "/a=3") + assert(partDir.exists()) + + checkAnswer(spark.table("t"), Row(1, 2, 3, 4)) + } + } + } } From 1f2ce17e3d2eca92bc01b6a22e908bd8fd1d9592 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 14:44:27 +0800 Subject: [PATCH 4/9] add more tests --- .../sql/execution/command/DDLSuite.scala | 22 +++++- .../sql/hive/execution/HiveDDLSuite.scala | 70 +++++++++++++++++-- 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f4f75b2fe1b6..52a0058e00b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1954,14 +1954,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("CTAS for data source table with a created location") { - withTable("t") { + withTable("t", "t1") { withTempDir { dir => spark.sql( s""" |CREATE TABLE t |USING parquet - |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) @@ -1969,10 +1968,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) + } + // partition table + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + val partDir = new File(dir.getAbsolutePath + "/a=3") assert(partDir.exists()) - checkAnswer(spark.table("t"), Row(1, 2, 3, 4)) + checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index bef9b72ac452..60074fbe2f6d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1589,25 +1589,87 @@ class HiveDDLSuite } test("CTAS for data source table with a created location") { - withTable("t") { + withTable("t", "t1") { withTempDir { dir => spark.sql( s""" |CREATE TABLE t |USING parquet - |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) + } + // partition table + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION 'file:$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) val partDir = new File(dir.getAbsolutePath + "/a=3") assert(partDir.exists()) - checkAnswer(spark.table("t"), Row(1, 2, 3, 4)) + checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) + } + } + } + + test("CTAS for hive table with a created location") { + withTable("t", "t1") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t + |USING hive + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) + } + // partition table + withTempDir { + dir => + spark.sql( + s""" + |CREATE TABLE t1 + |USING hive + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + val partDir = new File(dir.getAbsolutePath + "/a=3") + assert(partDir.exists()) + + checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) + } } } } From 5a3e5ac98855fe9f474a6c5e44eab42bee6c3d08 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 15:57:45 +0800 Subject: [PATCH 5/9] modify some code --- .../spark/sql/execution/command/DDLSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 52a0058e00b3..12263d5087e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1985,7 +1985,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - val partDir = new File(dir.getAbsolutePath + "/a=3") + val partDir = new File(dir, "a=3") assert(partDir.exists()) checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 60074fbe2f6d..29490218cb78 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1601,8 +1601,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1614,15 +1613,14 @@ class HiveDDLSuite |CREATE TABLE t1 |USING parquet |PARTITIONED BY(a, b) - |LOCATION 'file:$dir' + |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - val partDir = new File(dir.getAbsolutePath + "/a=3") + val partDir = new File(dir, "a=3") assert(partDir.exists()) checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) @@ -1665,7 +1663,7 @@ class HiveDDLSuite val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" assert(table.location.stripSuffix("/") == expectedPath) - val partDir = new File(dir.getAbsolutePath + "/a=3") + val partDir = new File(dir, "a=3") assert(partDir.exists()) checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) From 416ea37b8c85040cff868007f6c5fea55f9b2d16 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 18:24:53 +0800 Subject: [PATCH 6/9] add non-existent test case --- .../sql/execution/command/DDLSuite.scala | 81 +++++++------ .../sql/hive/execution/HiveDDLSuite.scala | 113 ++++++++++-------- 2 files changed, 109 insertions(+), 85 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 12263d5087e0..bc7030c1133d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1953,42 +1953,51 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("CTAS for data source table with a created location") { - withTable("t", "t1") { - withTempDir { - dir => - spark.sql( - s""" - |CREATE TABLE t - |USING parquet - |LOCATION '$dir' - |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d - """.stripMargin) - - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - - checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) - } - // partition table - withTempDir { - dir => - spark.sql( - s""" - |CREATE TABLE t1 - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION '$dir' - |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d - """.stripMargin) - - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - - val partDir = new File(dir, "a=3") - assert(partDir.exists()) - - checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) + Seq(true, false).foreach { shouldDelete => + val tcName = if (shouldDelete) "non-existent" else "existed" + test(s"CTAS for external data source table with a $tcName location") { + withTable("t", "t1") { + withTempDir { + dir => + if (shouldDelete) { + dir.delete() + } + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + + checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) + } + // partition table + withTempDir { + dir => + if (shouldDelete) { + dir.delete() + } + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + + val partDir = new File(dir, "a=3") + assert(partDir.exists()) + + checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 29490218cb78..24c3dfa83068 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1588,80 +1588,45 @@ class HiveDDLSuite } } - test("CTAS for data source table with a created location") { - withTable("t", "t1") { - withTempDir { - dir => - spark.sql( - s""" - |CREATE TABLE t - |USING parquet - |LOCATION '$dir' - |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d - """.stripMargin) - - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - - checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) - } - // partition table - withTempDir { - dir => - spark.sql( - s""" - |CREATE TABLE t1 - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION '$dir' - |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d - """.stripMargin) - - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) - - val partDir = new File(dir, "a=3") - assert(partDir.exists()) - - checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) - } - } - } - - test("CTAS for hive table with a created location") { - withTable("t", "t1") { - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + Seq(true, false).foreach { shouldDelete => + val tcName = if (shouldDelete) "non-existent" else "existed" + test(s"CTAS for external data source table with a $tcName location") { + withTable("t", "t1") { withTempDir { dir => + if (shouldDelete) { + dir.delete() + } spark.sql( s""" |CREATE TABLE t - |USING hive + |USING parquet |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } // partition table withTempDir { dir => + if (shouldDelete) { + dir.delete() + } spark.sql( s""" |CREATE TABLE t1 - |USING hive + |USING parquet |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) val partDir = new File(dir, "a=3") assert(partDir.exists()) @@ -1670,5 +1635,55 @@ class HiveDDLSuite } } } + + test(s"CTAS for external hive table with a $tcName location") { + withTable("t", "t1") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + withTempDir { + dir => + if (shouldDelete) { + dir.delete() + } + spark.sql( + s""" + |CREATE TABLE t + |USING hive + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) + } + // partition table + withTempDir { + dir => + if (shouldDelete) { + dir.delete() + } + spark.sql( + s""" + |CREATE TABLE t1 + |USING hive + |PARTITIONED BY(a, b) + |LOCATION '$dir' + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + val partDir = new File(dir, "a=3") + assert(partDir.exists()) + + checkAnswer(spark.table("t1"), Row(1, 2, 3, 4)) + } + } + } + } } } From 304ae3112950a80b4ff2a980199c7817c0d0562a Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 20:13:10 +0800 Subject: [PATCH 7/9] remove the overwrite parameter --- .../command/createDataSourceTables.scala | 9 ++-- .../execution/datasources/DataSource.scala | 43 +++++++------------ .../datasources/DataSourceStrategy.scala | 1 - .../InsertIntoHadoopFsRelationCommand.scala | 3 +- 4 files changed, 20 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3a47ae1d5dda..d835b521166a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -140,8 +140,8 @@ case class CreateDataSourceTableAsSelectCommand( return Seq.empty } - saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode, - overwrite = false, tableExists = true) + saveDataIntoTable( + sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true) } else { assert(table.schema.isEmpty) @@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, mode, overwrite = true, tableExists = false) + sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of @@ -178,7 +178,6 @@ case class CreateDataSourceTableAsSelectCommand( tableLocation: Option[String], data: LogicalPlan, mode: SaveMode, - overwrite: Boolean, tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. val pathOption = tableLocation.map("path" -> _) @@ -191,7 +190,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.writeAndRead(mode, Dataset.ofRows(session, query), Some(overwrite)) + dataSource.writeAndRead(mode, Dataset.ofRows(session, query)) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3a20dd617c1b..8c468bb46037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -406,11 +406,7 @@ case class DataSource( /** * Writes the given [[DataFrame]] out in this [[FileFormat]]. */ - private def writeInFileFormat( - format: FileFormat, - mode: SaveMode, - data: DataFrame, - overwrite: Option[Boolean]): Unit = { + private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -426,26 +422,18 @@ case class DataSource( s"got: ${allPaths.mkString(", ")}") } - val isOverWrite = overwrite match { - case Some(ow) => ow - case _ => - if (pathExists) { - if (mode == SaveMode.ErrorIfExists) { - throw new AnalysisException(s"path $outputPath already exists.") - } - if (mode == SaveMode.Ignore) { - // Since the path already exists and the save mode is Ignore, we will just return. - return - } - - if (mode == SaveMode.Append) false - else if (mode == SaveMode.Overwrite) true - else { - throw new IllegalStateException(s"unsupported save mode $mode") - } - } else true + if (pathExists) { + if (mode == SaveMode.ErrorIfExists) { + throw new AnalysisException(s"path $outputPath already exists.") + } + if (mode == SaveMode.Ignore) { + // Since the path already exists and the save mode is Ignore, we will just return. + return + } } + // if path does not exist, the ErrorIfExists and Ignore can be transformed to Append + val transformedMode = if (mode != SaveMode.Overwrite) SaveMode.Append else mode val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) @@ -476,8 +464,7 @@ case class DataSource( fileFormat = format, options = options, query = data.logicalPlan, - mode = mode, - isOverWrite, + mode = transformedMode, catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd @@ -487,7 +474,7 @@ case class DataSource( * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for * the following reading. */ - def writeAndRead(mode: SaveMode, data: DataFrame, overwrite: Option[Boolean]): BaseRelation = { + def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } @@ -496,7 +483,7 @@ case class DataSource( case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => - writeInFileFormat(format, mode, data, overwrite) + writeInFileFormat(format, mode, data) // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() case _ => @@ -516,7 +503,7 @@ case class DataSource( case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => - writeInFileFormat(format, mode, data, None) + writeInFileFormat(format, mode, data) case _ => sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4d204f4b185c..f4292320e4bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -201,7 +201,6 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { t.options, actualQuery, mode, - overwrite, table, Some(t.location)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 996519ee683a..ca7aa85cb1d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -47,7 +47,6 @@ case class InsertIntoHadoopFsRelationCommand( options: Map[String, String], query: LogicalPlan, mode: SaveMode, - overwrite: Boolean, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex]) extends RunnableCommand { @@ -98,7 +97,7 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, isAppend = isAppend) - if (overwrite) { + if (mode == SaveMode.Overwrite) { deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) } From a8dbccaff206df4773541ffff527f00e340a5ce3 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 10:49:07 +0800 Subject: [PATCH 8/9] revert savamode check --- .../execution/datasources/DataSource.scala | 19 +---- .../InsertIntoHadoopFsRelationCommand.scala | 84 +++++++++++-------- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 8c468bb46037..d510581f90e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -412,28 +412,15 @@ case class DataSource( // 2. Output path must be a legal HDFS style file system path; // 3. It's OK that the output path doesn't exist yet; val allPaths = paths ++ caseInsensitiveOptions.get("path") - val (outputPath, pathExists) = if (allPaths.length == 1) { + val outputPath = if (allPaths.length == 1) { val path = new Path(allPaths.head) val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - (qualifiedOutputPath, fs.exists(qualifiedOutputPath)) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + s"got: ${allPaths.mkString(", ")}") } - if (pathExists) { - if (mode == SaveMode.ErrorIfExists) { - throw new AnalysisException(s"path $outputPath already exists.") - } - if (mode == SaveMode.Ignore) { - // Since the path already exists and the save mode is Ignore, we will just return. - return - } - } - - // if path does not exist, the ErrorIfExists and Ignore can be transformed to Append - val transformedMode = if (mode != SaveMode.Overwrite) SaveMode.Append else mode val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) @@ -464,7 +451,7 @@ case class DataSource( fileFormat = format, options = options, query = data.logicalPlan, - mode = transformedMode, + mode = mode, catalogTable = catalogTable, fileIndex = fileIndex) sparkSession.sessionState.executePlan(plan).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index ca7aa85cb1d3..19b51d4d9530 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -97,45 +97,63 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, isAppend = isAppend) - if (mode == SaveMode.Overwrite) { - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + val doInsertion = (mode, pathExists) match { + case (SaveMode.ErrorIfExists, true) => + throw new AnalysisException(s"path $qualifiedOutputPath already exists.") + case (SaveMode.Overwrite, true) => + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + true + case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => + true + case (SaveMode.Ignore, exists) => + !exists + case (s, exists) => + throw new IllegalStateException(s"unsupported save mode $s ($exists)") } - // Callback for updating metastore partition metadata after the insertion job completes. - def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { - if (partitionsTrackedByCatalog) { - val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions - if (newPartitions.nonEmpty) { - AlterTableAddPartitionCommand( - catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), - ifNotExists = true).run(sparkSession) - } - if (mode == SaveMode.Overwrite) { - val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions - if (deletedPartitions.nonEmpty) { - AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.toSeq, - ifExists = true, purge = false, - retainData = true /* already deleted */).run(sparkSession) + if (doInsertion) { + + // Callback for updating metastore partition metadata after the insertion job completes. + def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { + if (partitionsTrackedByCatalog) { + val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions + if (newPartitions.nonEmpty) { + AlterTableAddPartitionCommand( + catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), + ifNotExists = true).run(sparkSession) + } + if (mode == SaveMode.Overwrite) { + val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions + if (deletedPartitions.nonEmpty) { + AlterTableDropPartitionCommand( + catalogTable.get.identifier, deletedPartitions.toSeq, + ifExists = true, purge = false, + retainData = true /* already deleted */).run(sparkSession) + } } } } - } - FileFormatWriter.write( - sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, - fileFormat = fileFormat, - committer = committer, - outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations), - hadoopConf = hadoopConf, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - refreshFunction = refreshPartitionsCallback, - options = options) - - fileIndex.foreach(_.refresh()) + FileFormatWriter.write( + sparkSession = sparkSession, + queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + fileFormat = fileFormat, + committer = committer, + outputSpec = FileFormatWriter.OutputSpec( + qualifiedOutputPath.toString, customPartitionLocations), + hadoopConf = hadoopConf, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + refreshFunction = refreshPartitionsCallback, + options = options) + + // refresh cached files in FileIndex + fileIndex.foreach(_.refresh()) + // refresh data cache if table is cached + sparkSession.catalog.refreshByPath(outputPath.toString) + } else { + logInfo("Skipping insertion into a relation that already exists.") + } Seq.empty[Row] } From d78b7d5f0dfd45661e30a90c1cabf7a30278eb3b Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 16:48:54 +0800 Subject: [PATCH 9/9] modify hacky code --- .../sql/execution/command/DDLSuite.scala | 25 ++++++++----------- .../sql/hive/execution/HiveDDLSuite.scala | 16 ++++++------ 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bc7030c1133d..8b8cd0fdf4db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1836,18 +1836,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("insert data to a data source table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => - val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING parquet - |OPTIONS(path "$path") + |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == path) + assert(table.location == dir.getAbsolutePath) dir.delete - val tableLocFile = new File(table.location.stripPrefix("file:")) + val tableLocFile = new File(table.location) assert(!tableLocFile.exists) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") assert(tableLocFile.exists) @@ -1878,16 +1877,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("insert into a data source table with no existed partition location should succeed") { withTable("t") { withTempDir { dir => - val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a int, b int, c int, d int) |USING parquet |PARTITIONED BY(a, b) - |LOCATION "$path" + |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == path) + assert(table.location == dir.getAbsolutePath) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) @@ -1906,15 +1904,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("read data from a data source table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => - val path = dir.toURI.toString.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING parquet - |OPTIONS(path "$path") + |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == path) + assert(table.location == dir.getAbsolutePath) dir.delete() checkAnswer(spark.table("t"), Nil) @@ -1939,7 +1936,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |CREATE TABLE t(a int, b int, c int, d int) |USING parquet |PARTITIONED BY(a, b) - |LOCATION "${dir.toURI}" + |LOCATION "$dir" """.stripMargin) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) @@ -1969,9 +1966,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + assert(table.location == dir.getAbsolutePath) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1989,9 +1985,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + assert(table.location == dir.getAbsolutePath) val partDir = new File(dir, "a=3") assert(partDir.exists()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 24c3dfa83068..81ae5b7bdb67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1606,7 +1606,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + assert(table.location == dir.getAbsolutePath) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1626,7 +1626,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location.stripSuffix("/") == dir.getAbsolutePath.stripSuffix("/")) + assert(table.location == dir.getAbsolutePath) val partDir = new File(dir, "a=3") assert(partDir.exists()) @@ -1651,10 +1651,10 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(new Path(table.location) == fs.makeQualified(dirPath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1672,10 +1672,10 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + assert(new Path(table.location) == fs.makeQualified(dirPath)) val partDir = new File(dir, "a=3") assert(partDir.exists())