diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 4fef07116171..f887deb0e517 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -208,6 +208,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { */ override def whiteList = Seq( "add_part_exist", + "dynamic_partition_skip_default", + "infer_bucket_sort_dyn_part", + "load_dyn_part1", + "load_dyn_part2", + "load_dyn_part3", + "load_dyn_part4", + "load_dyn_part5", + "load_dyn_part6", + "load_dyn_part7", + "load_dyn_part8", + "load_dyn_part9", + "load_dyn_part10", + "load_dyn_part11", + "load_dyn_part12", + "load_dyn_part13", + "load_dyn_part14", + "load_dyn_part14_win", "add_part_multiple", "add_partition_no_whitelist", "add_partition_with_whitelist", diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index ab7862f4f9e0..666d0749622c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter( null) } + def open(dynamicPartPath: String) { + val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get() + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val extension = Utilities.getFileExtension( + conf.value, + fileSinkConf.getCompressed, + getOutputFormat()) + + val outputName = "part-" + numfmt.format(splitID) + extension + val outputPath: Path = FileOutputFormat.getOutputPath(conf.value) + if (outputPath == null) { + throw new IOException("Undefined job output-path") + } + val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/" + val path = new Path(workPath, outputName) + getOutputCommitter().setupTask(getTaskContext()) + writer = HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + fileSinkConf, + path, + Reporter.NULL) + } + def write(value: Writable) { if (writer != null) { writer.write(value) @@ -192,4 +219,10 @@ private[hive] object SparkHiveHadoopWriter { } outputPath.makeQualified(fs) } + + val threadLocalNumberFormat = new ThreadLocal[NumberFormat] { + override def initialValue() = { + NumberFormat.getInstance() + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 05b2f5f6cd3f..8d13867c738f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -822,11 +822,6 @@ private[hive] object HiveQl { cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) - if (partitionKeys.values.exists(p => p.isEmpty)) { - throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" + - s"dynamic partitioning.") - } - InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite) case a: ASTNode => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 39033bdeac4b..82089c817e8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,6 +24,7 @@ import java.util.{HashMap => JHashMap} import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.serde2.Serializer @@ -34,12 +35,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} +import org.apache.hadoop.hive.conf.HiveConf /** * :: DeveloperApi :: @@ -101,62 +103,133 @@ case class InsertIntoHiveTable( } def saveAsHiveFile( - rdd: RDD[Writable], + rdd: RDD[(Writable, String)], valueClass: Class[_], fileSinkConf: FileSinkDesc, - conf: JobConf, - isCompressed: Boolean) { + conf: SerializableWritable[JobConf], + isCompressed: Boolean, + dynamicPartNum: Int) { if (valueClass == null) { throw new SparkException("Output value class not set") } - conf.setOutputValueClass(valueClass) + conf.value.setOutputValueClass(valueClass) if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { throw new SparkException("Output format class not set") } // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. - conf.set("mapred.output.compress", "true") + conf.value.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) + fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath( - conf, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) + conf.value.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + var writer: SparkHiveHadoopWriter = null + //Map restore writesr for Dynamic Partition + var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null + if (dynamicPartNum == 0) { + writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) + writer.preSetup() + } else { + writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] + } - val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) - writer.preSetup() - - def writeToFile(context: TaskContext, iter: Iterator[Writable]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - + def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + // writer for No Dynamic Partition + if (dynamicPartNum == 0) { writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() + } else { - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record) + } + var count = 0 + // writer for Dynamic Partition + var writer2: SparkHiveHadoopWriter = null + while(iter.hasNext) { + val record = iter.next() + count += 1 + if (record._2 == null) { // without Dynamic Partition + writer.write(record._1) + } else { // for Dynamic Partition + val location = fileSinkConf.getDirName + val partLocation = location + record._2 // this is why the writer can write to different file + writer2 = writerMap.get(record._2) match { + case Some(writer)=> writer + case None => { + val tempWriter = new SparkHiveHadoopWriter(conf.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) + tempWriter.setup(context.stageId, context.partitionId, attemptNumber) + tempWriter.open(record._2) + writerMap += (record._2 -> tempWriter) + tempWriter + } + } + writer2.write(record._1) + } + } + if (dynamicPartNum == 0) { + writer.close() + writer.commit() + } else { + for ((k,v) <- writerMap) { + v.close() + v.commit() + } + } } - writer.close() - writer.commit() + sc.sparkContext.runJob(rdd, writeToFile _) + if (dynamicPartNum == 0) { + writer.commitJob() + } else { + for ((k,v) <- writerMap) { + v.commitJob() + } + writerMap.clear() } - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() + + + } + /* + * e.g. + * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... + * return: /part1=val1/part2=val2 + * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... + * return: /part2=val2 + * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... + * return: /part2=val2/part3=val3 + * */ + private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = { + assert(dynamicPartNum > 0) + partCols + .takeRight(dynamicPartNum) + .zip(row.takeRight(dynamicPartNum)) + .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } + .mkString + } + /* + * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default + * */ + private def handleNull(rowVal: Any, defaultPartName: String): String = { + if (rowVal == null ||String.valueOf(rowVal).length == 0) { + defaultPartName + } else { + String.valueOf(rowVal) + } } override def execute() = result @@ -178,6 +251,39 @@ case class InsertIntoHiveTable( val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + var tmpDynamicPartNum = 0 + var numStaPart = 0 + val partitionSpec = partition.map { + case (key, Some(value)) => + numStaPart += 1 + key -> value + case (key, None) => + tmpDynamicPartNum += 1 + key -> "" + } + val dynamicPartNum = tmpDynamicPartNum + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) + // check if the partition spec is valid + if (dynamicPartNum > 0) { + if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + } + if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) + } + // check if static partition appear after dynamic partitions + for ((k,v) <- partitionSpec) { + if (partitionSpec(k) == "") { + if (numStaPart > 0) { // found a DP, but there exists ST as subpartition + throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) + } + } else { + numStaPart -= 1 + } + } + } + val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils @@ -186,32 +292,41 @@ case class InsertIntoHiveTable( ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray val outputData = new Array[Any](fieldOIs.length) + val defaultPartName = jobConfSer.value.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + var partColStr: Array[String] = null; + if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) { + partColStr = fileSinkConf + .getTableInfo + .getProperties + .getProperty("partition_columns") + .split("/") + } + iter.map { row => + var dynamicPartPath: String = null + if (dynamicPartNum > 0) { + dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName) + } var i = 0 - while (i < row.length) { + while (i < fieldOIs.length) { // Casts Strings to HiveVarchars when necessary. outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 } - serializer.serialize(outputData, standardOI) + serializer.serialize(outputData, standardOI) -> dynamicPartPath } } + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConfSer, + sc.hiveconf.getBoolean("hive.exec.compress.output", false), + dynamicPartNum) - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. - val jobConf = new JobConf(sc.hiveconf) - saveAsHiveFile( - rdd, - outputClass, - fileSinkConf, - jobConf, - sc.hiveconf.getBoolean("hive.exec.compress.output", false)) - - // TODO: Handle dynamic partitioning. val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. val qualifiedTableName = s"${table.databaseName}.${table.tableName}" @@ -220,10 +335,6 @@ case class InsertIntoHiveTable( // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { - val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => key -> "" // Should not reach here right now. - } val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) db.validatePartitionNameCharacters(partVals) // inheritTableSpecs is set to true. It should be set to false for a IMPORT query @@ -231,14 +342,26 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - db.loadPartition( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) + if (dynamicPartNum > 0) { + db.loadDynamicPartitions( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + dynamicPartNum, + holdDDLTime, + isSkewedStoreAsSubdir + ) + } else { + db.loadPartition( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } } else { db.loadTable( outputPath, diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 b/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe b/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe new file mode 100644 index 000000000000..573541ac9702 --- /dev/null +++ b/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 b/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 b/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee b/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed b/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index fdb2f41f5a5b..e34a61eb71f6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -191,6 +191,18 @@ class HiveQuerySuite extends HiveComparisonTest { |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX """.stripMargin) + createQueryTest("dynamic_partiton", + """ + |drop table IF EXISTS dynamic_part_table; + |create table dynamic_part_table(intcol int) partitioned by (partcol1 int, partcol2 int); + |set hive.exec.dynamic.partition.mode=nonstrict; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, 1 from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, 1 from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, NULL from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, NULL from src where key=150; + |drop table IF EXISTS dynamic_part_table; + """.stripMargin) + createQueryTest("lateral view5", "FROM src SELECT explode(array(key+3, key+4))")