diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala index 657842c620f30..6ba6713b69999 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala @@ -47,11 +47,22 @@ object SparkHadoopWriterUtils { * @return a job ID */ def createJobID(time: Date, id: Int): JobID = { + val jobTrackerID = createJobTrackerID(time) + createJobID(jobTrackerID, id) + } + + /** + * Create a job ID. + * + * @param jobTrackerID unique job track id + * @param id job number + * @return a job ID + */ + def createJobID(jobTrackerID: String, id: Int): JobID = { if (id < 0) { throw new IllegalArgumentException("Job number is negative") } - val jobtrackerID = createJobTrackerID(time) - new JobID(jobtrackerID, id) + new JobID(jobTrackerID, id) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 43b18a3b2d10c..ce721cd522d29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -201,14 +201,14 @@ object FileFormatWriter extends Logging { rdd } - val jobIdInstant = new Date().getTime + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) sparkSession.sparkContext.runJob( rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, - jobIdInstant = jobIdInstant, + jobTrackerID = jobTrackerID, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, @@ -244,7 +244,7 @@ object FileFormatWriter extends Logging { /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, - jobIdInstant: Long, + jobTrackerID: String, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, @@ -252,7 +252,7 @@ object FileFormatWriter extends Logging { iterator: Iterator[InternalRow], concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index d827e83623570..ea13e2deac8ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri case class FileWriterFactory ( description: WriteJobDescription, committer: FileCommitProtocol) extends DataWriterFactory { + + private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { val taskAttemptContext = createTaskAttemptContext(partitionId) committer.setupTask(taskAttemptContext) @@ -40,7 +43,6 @@ case class FileWriterFactory ( } private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, partitionId) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object