Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[libsvm] class LibSVMOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/** A helper object for writing data out to a location. */
object WriteOutput extends Logging {

val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"

/** A shared job description for all the write tasks. */
private class WriteJobDescription(
val serializableHadoopConf: SerializableConfiguration,
Expand Down Expand Up @@ -287,25 +289,26 @@ object WriteOutput extends Logging {
}
}

private def getBucketIdFromKey(key: InternalRow): Option[Int] =
description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length) }

/**
* Open and returns a new OutputWriter given a partition key and optional bucket id.
* If bucket id is specified, we will append it to the end of the file name, but before the
* file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
*/
private def newOutputWriter(
key: InternalRow,
getPartitionString: UnsafeProjection): OutputWriter = {
private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
val path =
if (description.partitionColumns.nonEmpty) {
val partitionPath = getPartitionString(key).getString(0)
val partitionPath = partString(key).getString(0)
new Path(stagingPath, partitionPath).toString
} else {
stagingPath
}
val bucketId = getBucketIdFromKey(key)

// If the bucket spec is defined, the bucket column is right after the partition columns
val bucketId = if (description.bucketSpec.isDefined) {
Some(key.getInt(description.partitionColumns.length))
} else {
None
}

val newWriter = description.outputWriterFactory.newInstance(
path = path,
Expand All @@ -319,7 +322,7 @@ object WriteOutput extends Logging {
override def execute(iter: Iterator[InternalRow]): Unit = {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val sortingExpressions: Seq[Expression] =
description.partitionColumns ++ bucketIdExpression ++ sortColumns
description.partitionColumns ++ bucketIdExpression ++ sortColumns
val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)

val sortingKeySchema = StructType(sortingExpressions.map {
Expand All @@ -333,8 +336,8 @@ object WriteOutput extends Logging {
description.nonPartitionColumns, description.allColumns)

// Returns the partition path given a partition key.
val getPartitionString =
UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
val getPartitionString = UnsafeProjection.create(
Seq(Concat(partitionStringExpression)), description.partitionColumns)

// Sorts the data before write, so that we only need one writer at the same time.
val sorter = new UnsafeKVExternalSorter(
Expand Down Expand Up @@ -414,7 +417,7 @@ object WriteOutput extends Logging {
// `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used to identify a job
// rather than a single task output file is that, speculative tasks must generate the same
// output file name as the original task.
job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)
job.getConfiguration.set(WriteOutput.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)

val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
val outputCommitter = newOutputCommitter(
Expand Down Expand Up @@ -474,7 +477,3 @@ object WriteOutput extends Logging {
}
}
}

object WriterContainer {
val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriteOutput}
import org.apache.spark.sql.types._

object CSVRelation extends Logging {
Expand Down Expand Up @@ -200,7 +200,7 @@ private[csv] class CsvOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[json] class JsonOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriterContainer}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriteOutput}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -155,7 +155,7 @@ private[parquet] class ParquetOutputWriter(
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private[orc] class OrcOutputWriter(

private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = conf.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriteOutput.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
Expand Down