diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 403191af5e5b..c226cc2437db 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -202,6 +202,7 @@ query insertInto : INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)? | INSERT INTO TABLE? tableIdentifier partitionSpec? + | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? (STORED AS format=IDENTIFIER)? ; partitionSpecLocation @@ -649,7 +650,7 @@ nonReserved | ASC | DESC | LIMIT | RENAME | SETS | AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE | DESCRIBE | DROP | EXISTS | FALSE | FOR | GROUP | IN | INSERT | INTO | IS |LIKE - | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE + | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE | DIRECTORY ; SELECT: 'SELECT'; @@ -717,6 +718,7 @@ WITH: 'WITH'; VALUES: 'VALUES'; CREATE: 'CREATE'; TABLE: 'TABLE'; +DIRECTORY: 'DIRECTORY'; VIEW: 'VIEW'; REPLACE: 'REPLACE'; INSERT: 'INSERT'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 0e08bf013c8d..a2d24236470d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -55,6 +55,9 @@ object UnsupportedOperationChecker { case _: InsertIntoTable => throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") + case _: InsertIntoDir => + throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") + case Aggregate(_, _, child) if child.isStreaming => if (outputMode == Append) { throwError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a13c03a529f3..cb83db3d61bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -27,6 +27,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ @@ -193,20 +194,121 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan. + * A table property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a table property + * identifier. + */ + override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * Convert a table property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitTablePropertyList( + ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.tableProperty.asScala.map { property => + val key = visitTablePropertyKey(property.key) + val value = Option(property.value).map(string).orNull + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties, ctx) + properties.toMap + } + + /** Empty storage format for default values and copies. */ + protected val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) + + /** + * Create a [[CatalogStorageFormat]] used for creating tables. + * + * Example format: + * {{{ + * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] + * }}} + * + * OR + * + * {{{ + * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] + * [COLLECTION ITEMS TERMINATED BY char] + * [MAP KEYS TERMINATED BY char] + * [LINES TERMINATED BY char] + * [NULL DEFINED AS char] + * }}} + */ + protected def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { + ctx match { + case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) + case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) + } + } + + /** + * Create SERDE row format name and properties pair. + */ + override def visitRowFormatSerde( + ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + EmptyStorageFormat.copy( + serde = Option(string(name)), + serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)) + } + + /** + * Create a delimited row format properties object. + */ + override def visitRowFormatDelimited( + ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { + // Collect the entries if any. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = + entry("field.delim", ctx.fieldsTerminatedBy) ++ + entry("serialization.format", ctx.fieldsTerminatedBy) ++ + entry("escape.delim", ctx.escapedBy) ++ + // The following typo is inherited from Hive... + entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("mapkey.delim", ctx.keysTerminatedBy) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "line.delim" -> value + } + EmptyStorageFormat.copy(serdeProperties = entries.toMap) + } + + /** + * Add an INSERT INTO [TABLE] / INSERT OVERWRITE TABLE / INSERT OVERWRITE DIRECTORY + * operation to the logical plan. */ private def withInsertInto( ctx: InsertIntoContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { - val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + val tableIdent = Option(ctx.tableIdentifier).map(visitTableIdentifier).getOrElse(None) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) - InsertIntoTable( - UnresolvedRelation(tableIdent, None), - partitionKeys, - query, - ctx.OVERWRITE != null, - ctx.EXISTS != null) + var storageFormat = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + storageFormat = storageFormat.copy(serde = Option(ctx.format).map(format => format.getText)) + + tableIdent match { + case Some(ti: TableIdentifier) => InsertIntoTable(UnresolvedRelation(ti, None), + partitionKeys, + query, + ctx.OVERWRITE != null, ctx.EXISTS != null) + case _ => InsertIntoDir(string(ctx.path), ctx.LOCAL != null, storageFormat, query) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ca0096eeb208..f628af614ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -386,6 +387,16 @@ case class InsertIntoTable( } } +case class InsertIntoDir( + path: String, + isLocal: Boolean, + rowFormat: CatalogStorageFormat, + child: LogicalPlan) + extends LogicalPlan { + override def children: Seq[LogicalPlan] = child :: Nil + override def output: Seq[Attribute] = Seq.empty +} + /** * A container for holding named common table expressions (CTEs) and a query plan. * This operator will be removed during analysis and the relations will be substituted into child. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f85d6062e8d3..d06bb48275ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -381,22 +381,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ) } - /** - * Convert a table property list into a key-value map. - * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. - */ - override def visitTablePropertyList( - ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.tableProperty.asScala.map { property => - val key = visitTablePropertyKey(property.key) - val value = Option(property.value).map(string).orNull - key -> value - } - // Check for duplicate property names. - checkDuplicateKeys(properties, ctx) - properties.toMap - } - /** * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. */ @@ -423,22 +407,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { props.keys.toSeq } - /** - * A table property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a table property - * identifier. - */ - override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) - } else { - key.getText - } - } - /** * Create a [[CreateDatabaseCommand]] command. - * * For example: * {{{ * CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] @@ -950,9 +920,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) - /** * Create a [[CatalogStorageFormat]]. */ @@ -980,70 +947,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** - * Create a [[CatalogStorageFormat]] used for creating tables. - * - * Example format: - * {{{ - * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] - * }}} - * - * OR - * - * {{{ - * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] - * [COLLECTION ITEMS TERMINATED BY char] - * [MAP KEYS TERMINATED BY char] - * [LINES TERMINATED BY char] - * [NULL DEFINED AS char] - * }}} - */ - private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { - ctx match { - case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) - case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) - } - } - - /** - * Create SERDE row format name and properties pair. - */ - override def visitRowFormatSerde( - ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { - import ctx._ - EmptyStorageFormat.copy( - serde = Option(string(name)), - serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) - } - - /** - * Create a delimited row format properties object. - */ - override def visitRowFormatDelimited( - ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { - // Collect the entries if any. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).toSeq.map(x => key -> string(x)) - } - // TODO we need proper support for the NULL format. - val entries = - entry("field.delim", ctx.fieldsTerminatedBy) ++ - entry("serialization.format", ctx.fieldsTerminatedBy) ++ - entry("escape.delim", ctx.escapedBy) ++ - // The following typo is inherited from Hive... - entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ - entry("mapkey.delim", ctx.keysTerminatedBy) ++ - Option(ctx.linesSeparatedBy).toSeq.map { token => - val value = string(token) - assert( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - ctx) - "line.delim" -> value - } - EmptyStorageFormat.copy(serdeProperties = entries.toMap) - } - /** * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT * and STORED AS. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 71b180e55b58..3ccf23de21ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -50,6 +50,10 @@ private[hive] trait HiveStrategies { table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil + case logical.InsertIntoDir( + path, isLocal, rowFormat, child) => + execution.InsertIntoDir( + path, isLocal, rowFormat, planLater(child)) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala new file mode 100644 index 000000000000..efa56be370bd --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.util.Properties + +import scala.language.existentials + +import antlr.SemanticException +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.util.{SerializableJobConf, Utils} + +case class InsertIntoDir( + path: String, + isLocal: Boolean, + rowFormat: CatalogStorageFormat, + child: SparkPlan) extends SaveAsHiveFile { + + @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + def output: Seq[Attribute] = Seq.empty + + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val hadoopConf = sessionState.newHadoopConf() + + val properties = new Properties() + + val Array(cols, types) = child.output.foldLeft(Array("", "")) { case (r, a) => + r(0) = r(0) + a.name + "," + r(1) = r(1) + a.dataType.typeName + ":" + r + } + + properties.put("columns", cols.dropRight(1)) + properties.put("columns.types", types.dropRight(1)) + + val defaultSerde = hadoopConf.get("hive.default.fileformat", "textFile") + val serDe = rowFormat.serde.getOrElse(defaultSerde).toLowerCase + val hiveSerDe = HiveSerDe.sourceToSerDe(serDe, sessionState.conf).getOrElse( + throw new SemanticException(s"Unrecognized serde format ${serDe}")) + + properties.put(serdeConstants.SERIALIZATION_LIB, + hiveSerDe.serde.getOrElse(classOf[LazySimpleSerDe].getName)) + + import scala.collection.JavaConverters._ + properties.putAll(rowFormat.serdeProperties.asJava) + + val tableDesc = new TableDesc( + classOf[TextInputFormat], + Utils.classForName(hiveSerDe.outputFormat.get), + properties + ) + + val isCompressed = + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean + + val targetPath = new Path(path) + + val fileSinkConf = new FileSinkDesc(targetPath.toString, tableDesc, isCompressed) + + val jobConf = new JobConf(hadoopConf) + jobConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") + + val jobConfSer = new SerializableJobConf(jobConf) + + val writerContainer = new SparkHiveWriterContainer( + jobConf, + fileSinkConf, + child.output) + + if( !isLocal ) { + FileSystem.get(jobConf).delete(targetPath, true) + } + + @transient val outputClass = writerContainer.newSerializer(tableDesc).getSerializedClass + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer, + isCompressed) + + val outputPath = FileOutputFormat.getOutputPath(jobConf) + if( isLocal ) { + Utils.deleteRecursively(new File(path)) + outputPath.getFileSystem(hadoopConf).copyToLocalFile(true, outputPath, targetPath) + log.info(s"Copied results from ${outputPath} to local dir ${path}") + } else { + log.info(s"Results available at path ${outputPath}") + } + + Seq.empty[InternalRow] + } + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3805674d3958..5db1bcf490c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -47,7 +47,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryExecNode { + ifNotExists: Boolean) extends SaveAsHiveFile { @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] @transient private val client = sessionState.metadataHive @@ -109,28 +109,6 @@ case class InsertIntoHiveTable( new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 } - private def saveAsHiveFile( - rdd: RDD[InternalRow], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: SerializableJobConf, - writerContainer: SparkHiveWriterContainer): Unit = { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - writerContainer.driverSideSetup() - sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) - writerContainer.commitJob() - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -146,18 +124,6 @@ case class InsertIntoHiveTable( val hadoopConf = sessionState.newHadoopConf() val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = - sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean - - if (isCompressed) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - hadoopConf.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) - } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) @@ -203,18 +169,6 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) - val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use a output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) @@ -228,12 +182,12 @@ case class InsertIntoHiveTable( new SparkHiveWriterContainer( jobConf, fileSinkConf, - child.output, - table) + child.output) } @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer, + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala new file mode 100644 index 000000000000..1bd2296b0779 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.mapred.FileOutputFormat + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.UnaryExecNode +import org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc +import org.apache.spark.sql.hive.SparkHiveWriterContainer +import org.apache.spark.util.SerializableJobConf + +// Base trait from which all hive insert statement physical execution extends. +private[hive] trait SaveAsHiveFile extends UnaryExecNode { + +protected def saveAsHiveFile( + rdd: RDD[InternalRow], + valueClass: Class[_], + fileSinkConf: ShimFileSinkDesc, + conf: SerializableJobConf, + writerContainer: SparkHiveWriterContainer, + isCompressed: Boolean): Unit = { + assert(valueClass != null, "Output value class not set") + + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + conf.value.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) + } + + // When speculation is on and output committer class name contains "Direct", we should warn + // users that they may loss data if they are using a direct output committer. + val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) + val outputCommitterClass = conf.value.get("mapred.output.committer.class", "") + if (speculationEnabled && outputCommitterClass.contains("Direct")) { + val warningMessage = + s"$outputCommitterClass may be an output committer that writes data directly to " + + "the final location. Because speculation is enabled, this output committer may " + + "cause data loss (see the case in SPARK-10063). If possible, please use a output " + + "committer that does not have this behavior (e.g. FileOutputCommitter)." + logWarning(warningMessage) + } + conf.value.setOutputValueClass(valueClass) + + + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) + log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + writerContainer.driverSideSetup() + sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) + writerContainer.commitJob() + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe264ead5..566b9e24202b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -52,8 +52,7 @@ import org.apache.spark.util.SerializableJobConf private[hive] class SparkHiveWriterContainer( @transient private val jobConf: JobConf, fileSinkConf: FileSinkDesc, - inputSchema: Seq[Attribute], - table: MetastoreRelation) + inputSchema: Seq[Attribute]) extends Logging with HiveInspectors with Serializable { @@ -218,7 +217,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( dynamicPartColNames: Array[String], inputSchema: Seq[Attribute], table: MetastoreRelation) - extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) { + extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) { import SparkHiveDynamicPartitionWriterContainer._