diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index ea13e2deac8ab..4b1a099d3bac9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -30,7 +30,12 @@ case class FileWriterFactory ( description: WriteJobDescription, committer: FileCommitProtocol) extends DataWriterFactory { - private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + // SPARK-42478: jobId across tasks should be consistent to meet the contract + // expected by Hadoop committers, but `JobId` cannot be serialized. + // thus, persist the serializable jobTrackerID in the class and make jobId a + // transient lazy val which recreates it each time to ensure jobId is unique. + private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date) + @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0) override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { val taskAttemptContext = createTaskAttemptContext(partitionId)