diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala deleted file mode 100644 index 46e22b215b8ee..0000000000000 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.IOException -import java.text.NumberFormat -import java.text.SimpleDateFormat -import java.util.{Date, Locale} - -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.mapred._ -import org.apache.hadoop.mapreduce.TaskType - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.SparkHadoopWriterUtils -import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.rdd.HadoopRDD -import org.apache.spark.util.SerializableJobConf - -/** - * Internal helper class that saves an RDD using a Hadoop OutputFormat. - * - * Saves the RDD using a JobConf, which should contain an output key class, an output value class, - * a filename to write to, etc, exactly like in a Hadoop MapReduce job. - */ -private[spark] -class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { - - private val now = new Date() - private val conf = new SerializableJobConf(jobConf) - - private var jobID = 0 - private var splitID = 0 - private var attemptID = 0 - private var jID: SerializableWritable[JobID] = null - private var taID: SerializableWritable[TaskAttemptID] = null - - @transient private var writer: RecordWriter[AnyRef, AnyRef] = null - @transient private var format: OutputFormat[AnyRef, AnyRef] = null - @transient private var committer: OutputCommitter = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null - - def preSetup() { - setIDs(0, 0, 0) - HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value) - - val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) - } - - - def setup(jobid: Int, splitid: Int, attemptid: Int) { - setIDs(jobid, splitid, attemptid) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(now), - jobid, splitID, attemptID, conf.value) - } - - def open() { - val numfmt = NumberFormat.getInstance(Locale.US) - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val outputName = "part-" + numfmt.format(splitID) - val path = FileOutputFormat.getOutputPath(conf.value) - val fs: FileSystem = { - if (path != null) { - path.getFileSystem(conf.value) - } else { - FileSystem.get(conf.value) - } - } - - getOutputCommitter().setupTask(getTaskContext()) - writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) - } - - def write(key: AnyRef, value: AnyRef) { - if (writer != null) { - writer.write(key, value) - } else { - throw new IOException("Writer is null, open() has not been called") - } - } - - def close() { - writer.close(Reporter.NULL) - } - - def commit() { - SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID) - } - - def commitJob() { - val cmtr = getOutputCommitter() - cmtr.commitJob(getJobContext()) - } - - // ********* Private Functions ********* - - private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = { - if (format == null) { - format = conf.value.getOutputFormat() - .asInstanceOf[OutputFormat[AnyRef, AnyRef]] - } - format - } - - private def getOutputCommitter(): OutputCommitter = { - if (committer == null) { - committer = conf.value.getOutputCommitter - } - committer - } - - private def getJobContext(): JobContext = { - if (jobContext == null) { - jobContext = new JobContextImpl(conf.value, jID.value) - } - jobContext - } - - private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) { - taskContext = newTaskAttemptContext(conf.value, taID.value) - } - taskContext - } - - protected def newTaskAttemptContext( - conf: JobConf, - attemptId: TaskAttemptID): TaskAttemptContext = { - new TaskAttemptContextImpl(conf, attemptId) - } - - private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { - jobID = jobid - splitID = splitid - attemptID = attemptid - - jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid)) - taID = new SerializableWritable[TaskAttemptID]( - new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) - } -} diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala new file mode 100644 index 0000000000000..ddbd624b380d4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapreduce.{TaskAttemptContext => NewTaskAttemptContext} + +/** + * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter + * (from the old mapred API). + * + * Unlike Hadoop's OutputCommitter, this implementation is serializable. + */ +class HadoopMapRedCommitProtocol(jobId: String, path: String) + extends HadoopMapReduceCommitProtocol(jobId, path) { + + override def setupCommitter(context: NewTaskAttemptContext): OutputCommitter = { + val config = context.getConfiguration.asInstanceOf[JobConf] + config.getOutputCommitter + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala new file mode 100644 index 0000000000000..805415203a156 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce._ + +import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} + +/** + * Interface for create output format/committer/writer used during saving an RDD using a Hadoop + * OutputFormat (both from the old mapred API and the new mapreduce API) + * + * Notes: + * 1. Implementations should throw [[IllegalArgumentException]] when wrong hadoop API is + * referenced; + * 2. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors; + * 3. Implementations should have a constructor with exactly one argument: + * (conf: SerializableConfiguration) or (conf: SerializableJobConf). + */ +abstract class HadoopWriteConfigUtil[K, V: ClassTag] extends Serializable { + + // -------------------------------------------------------------------------- + // Create JobContext/TaskAttemptContext + // -------------------------------------------------------------------------- + + def createJobContext(jobTrackerId: String, jobId: Int): JobContext + + def createTaskAttemptContext( + jobTrackerId: String, + jobId: Int, + splitId: Int, + taskAttemptId: Int): TaskAttemptContext + + // -------------------------------------------------------------------------- + // Create committer + // -------------------------------------------------------------------------- + + def createCommitter(jobId: Int): HadoopMapReduceCommitProtocol + + // -------------------------------------------------------------------------- + // Create writer + // -------------------------------------------------------------------------- + + def initWriter(taskContext: TaskAttemptContext, splitId: Int): Unit + + def write(pair: (K, V)): Unit + + def closeWriter(taskContext: TaskAttemptContext): Unit + + // -------------------------------------------------------------------------- + // Create OutputFormat + // -------------------------------------------------------------------------- + + def initOutputFormat(jobContext: JobContext): Unit + + // -------------------------------------------------------------------------- + // Verify hadoop config + // -------------------------------------------------------------------------- + + def assertConf(): Unit + + def checkOutputSpecs(jobContext: JobContext): Unit + +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala deleted file mode 100644 index aaeb3d003829a..0000000000000 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.internal.io - -import java.text.SimpleDateFormat -import java.util.{Date, Locale} - -import scala.reflect.ClassTag -import scala.util.DynamicVariable - -import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.{JobConf, JobID} -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl - -import org.apache.spark.{SparkConf, SparkException, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.rdd.RDD -import org.apache.spark.util.{SerializableConfiguration, Utils} - -/** - * A helper object that saves an RDD using a Hadoop OutputFormat - * (from the newer mapreduce API, not the old mapred API). - */ -private[spark] -object SparkHadoopMapReduceWriter extends Logging { - - /** - * Basic work flow of this command is: - * 1. Driver side setup, prepare the data source and hadoop configuration for the write job to - * be issued. - * 2. Issues a write job consists of one or more executor side tasks, each of which writes all - * rows within an RDD partition. - * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any - * exception is thrown during task commitment, also aborts that task. - * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is - * thrown during job commitment, also aborts the job. - */ - def write[K, V: ClassTag]( - rdd: RDD[(K, V)], - hadoopConf: Configuration): Unit = { - // Extract context and configuration from RDD. - val sparkContext = rdd.context - val stageId = rdd.id - val sparkConf = rdd.conf - val conf = new SerializableConfiguration(hadoopConf) - - // Set up a job. - val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date()) - val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) - val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) - val format = jobContext.getOutputFormatClass - - if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) { - // FileOutputFormat ignores the filesystem parameter - val jobFormat = format.newInstance - jobFormat.checkOutputSpecs(jobContext) - } - - val committer = FileCommitProtocol.instantiate( - className = classOf[HadoopMapReduceCommitProtocol].getName, - jobId = stageId.toString, - outputPath = conf.value.get("mapred.output.dir"), - isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] - committer.setupJob(jobContext) - - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - if (SparkHadoopWriterUtils.isSpeculationEnabled(sparkConf) && committer.isDirectOutput) { - val warningMessage = - s"$committer may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } - - // Try to write all RDD partitions as a Hadoop OutputFormat. - try { - val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - executeTask( - context = context, - jobTrackerId = jobTrackerId, - sparkStageId = context.stageId, - sparkPartitionId = context.partitionId, - sparkAttemptNumber = context.attemptNumber, - committer = committer, - hadoopConf = conf.value, - outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]], - iterator = iter) - }) - - committer.commitJob(jobContext, ret) - logInfo(s"Job ${jobContext.getJobID} committed.") - } catch { - case cause: Throwable => - logError(s"Aborting job ${jobContext.getJobID}.", cause) - committer.abortJob(jobContext) - throw new SparkException("Job aborted.", cause) - } - } - - /** Write an RDD partition out in a single Spark task. */ - private def executeTask[K, V: ClassTag]( - context: TaskContext, - jobTrackerId: String, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int, - committer: FileCommitProtocol, - hadoopConf: Configuration, - outputFormat: Class[_ <: OutputFormat[K, V]], - iterator: Iterator[(K, V)]): TaskCommitMessage = { - // Set up a task. - val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE, - sparkPartitionId, sparkAttemptNumber) - val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) - committer.setupTask(taskContext) - - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) - - // Initiate the writer. - val taskFormat = outputFormat.newInstance() - // If OutputFormat is Configurable, we should set conf to it. - taskFormat match { - case c: Configurable => c.setConf(hadoopConf) - case _ => () - } - val writer = taskFormat.getRecordWriter(taskContext) - .asInstanceOf[RecordWriter[K, V]] - require(writer != null, "Unable to obtain RecordWriter") - var recordsWritten = 0L - - // Write all rows in RDD partition. - try { - val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { - while (iterator.hasNext) { - val pair = iterator.next() - writer.write(pair._1, pair._2) - - // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) - recordsWritten += 1 - } - - committer.commitTask(taskContext) - }(catchBlock = { - committer.abortTask(taskContext) - logError(s"Task ${taskContext.getTaskAttemptID} aborted.") - }, finallyBlock = writer.close(taskContext)) - - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } - - ret - } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) - } - } -} - -private[spark] -object SparkHadoopWriterUtils { - - private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 - - def createJobID(time: Date, id: Int): JobID = { - val jobtrackerID = createJobTrackerID(time) - new JobID(jobtrackerID, id) - } - - def createJobTrackerID(time: Date): String = { - new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) - } - - def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) { - throw new IllegalArgumentException("Output path is null") - } - val outputPath = new Path(path) - val fs = outputPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException("Incorrectly formatted output path") - } - outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - - // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation - // setting can take effect: - def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = { - val validationDisabled = disableOutputSpecValidation.value - val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true) - enabledInConf && !validationDisabled - } - - def isSpeculationEnabled(conf: SparkConf): Boolean = { - conf.getBoolean("spark.speculation", false) - } - - // TODO: these don't seem like the right abstractions. - // We should abstract the duplicate code in a less awkward way. - - // return type: (output metrics, bytes written callback), defined only if the latter is defined - def initHadoopOutputMetrics( - context: TaskContext): Option[(OutputMetrics, () => Long)] = { - val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - bytesWrittenCallback.map { b => - (context.taskMetrics().outputMetrics, b) - } - } - - def maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], - recordsWritten: Long): Unit = { - if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - outputMetricsAndBytesWrittenCallback.foreach { - case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } - } - } - - /** - * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case - * basis; see SPARK-4835 for more details. - */ - val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) -} diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala new file mode 100644 index 0000000000000..2d1aeff7b3151 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.text.NumberFormat +import java.util.{Date, Locale} + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttemptContext, TaskAttemptID => NewTaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemptContextImpl} + +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.OutputMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.rdd.{HadoopRDD, RDD} +import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} + +/** + * A helper object that saves an RDD using a Hadoop OutputFormat + * (from the old mapred API). + */ +private[spark] +object SparkHadoopWriter extends Logging { + import SparkHadoopWriterUtils._ + + /** + * Basic work flow of this command is: + * 1. Driver side setup, prepare the data source and hadoop configuration for the write job to + * be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ + def write[K, V: ClassTag]( + rdd: RDD[(K, V)], + config: HadoopWriteConfigUtil[K, V]): Unit = { + // Extract context and configuration from RDD. + val sparkContext = rdd.context + val stageId = rdd.id + val sparkConf = rdd.conf + + // Set up a job. + val jobTrackerId = createJobTrackerID(new Date()) + val jobContext = config.createJobContext(jobTrackerId, stageId) + config.initOutputFormat(jobContext) + + // Assert the output format/key/value class is set in JobConf. + config.assertConf() + + if (isOutputSpecValidationEnabled(sparkConf)) { + // FileOutputFormat ignores the filesystem parameter + config.checkOutputSpecs(jobContext) + } + + val committer = config.createCommitter(stageId) + committer.setupJob(jobContext) + + // When speculation is on and output committer class name contains "Direct", we should warn + // users that they may loss data if they are using a direct output committer. + // There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad + // result of using direct output committer with speculation enabled. + if (isSpeculationEnabled(sparkConf) && committer.isDirectOutput) { + val warningMessage = + s"$committer may be an output committer that writes data directly to " + + "the final location. Because speculation is enabled, this output committer may " + + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + + "committer that does not have this behavior (e.g. FileOutputCommitter)." + logWarning(warningMessage) + } + + // Try to write all RDD partitions as a Hadoop OutputFormat. + try { + val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + executeTask( + context = context, + config = config, + jobTrackerId = jobTrackerId, + sparkStageId = context.stageId, + sparkPartitionId = context.partitionId, + sparkAttemptNumber = context.attemptNumber, + committer = committer, + iterator = iter) + }) + + committer.commitJob(jobContext, ret) + logInfo(s"Job ${jobContext.getJobID} committed.") + } catch { + case cause: Throwable => + logError(s"Aborting job ${jobContext.getJobID}.", cause) + committer.abortJob(jobContext) + throw new SparkException("Job aborted.", cause) + } + } + + /** Write a RDD partition out in a single Spark task. */ + private def executeTask[K, V: ClassTag]( + context: TaskContext, + config: HadoopWriteConfigUtil[K, V], + jobTrackerId: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[(K, V)]): TaskCommitMessage = { + // Set up a task. + val taskContext = config.createTaskAttemptContext( + jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber) + committer.setupTask(taskContext) + + val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = + initHadoopOutputMetrics(context) + + // Initiate the writer. + config.initWriter(taskContext, sparkPartitionId) + var recordsWritten = 0L + + // Write all rows in RDD partition. + try { + val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { + val pair = iterator.next() + config.write(pair) + + // Update bytes written metric every few records + maybeUpdateOutputMetrics( + outputMetricsAndBytesWrittenCallback, recordsWritten) + recordsWritten += 1 + } + + committer.commitTask(taskContext) + }(catchBlock = { + committer.abortTask(taskContext) + logError(s"Task ${taskContext.getTaskAttemptID} aborted.") + }, finallyBlock = config.closeWriter(taskContext)) + + outputMetricsAndBytesWrittenCallback.foreach { + case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } + + ret + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) + } + } +} + +/** + * A helper class that reads JobConf from older mapred API, creates output Format/Committer/Writer. + */ +private[spark] +class HadoopMapRedWriteConfigUtil[K, V: ClassTag](conf: SerializableJobConf) + extends HadoopWriteConfigUtil[K, V] with Logging { + + private var outputFormat: Class[_ <: OutputFormat[K, V]] = null + private var writer: RecordWriter[K, V] = null + + private def getConf(): JobConf = conf.value + + // -------------------------------------------------------------------------- + // Create JobContext/TaskAttemptContext + // -------------------------------------------------------------------------- + + def createJobContext(jobTrackerId: String, jobId: Int): NewJobContext = { + val jobAttemptId = new SerializableWritable(new JobID(jobTrackerId, jobId)) + new JobContextImpl(getConf(), jobAttemptId.value) + } + + def createTaskAttemptContext( + jobTrackerId: String, + jobId: Int, + splitId: Int, + taskAttemptId: Int): NewTaskAttemptContext = { + // Update JobConf. + HadoopRDD.addLocalConfiguration(jobTrackerId, jobId, splitId, taskAttemptId, conf.value) + // Create taskContext. + val attemptId = new TaskAttemptID(jobTrackerId, jobId, TaskType.MAP, splitId, taskAttemptId) + new TaskAttemptContextImpl(getConf(), attemptId) + } + + // -------------------------------------------------------------------------- + // Create committer + // -------------------------------------------------------------------------- + + def createCommitter(jobId: Int): HadoopMapReduceCommitProtocol = { + // Update JobConf. + HadoopRDD.addLocalConfiguration("", 0, 0, 0, getConf()) + // Create commit protocol. + FileCommitProtocol.instantiate( + className = classOf[HadoopMapRedCommitProtocol].getName, + jobId = jobId.toString, + outputPath = getConf().get("mapred.output.dir"), + isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] + } + + // -------------------------------------------------------------------------- + // Create writer + // -------------------------------------------------------------------------- + + def initWriter(taskContext: NewTaskAttemptContext, splitId: Int): Unit = { + val numfmt = NumberFormat.getInstance(Locale.US) + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val outputName = "part-" + numfmt.format(splitId) + val path = FileOutputFormat.getOutputPath(getConf()) + val fs: FileSystem = { + if (path != null) { + path.getFileSystem(getConf()) + } else { + FileSystem.get(getConf()) + } + } + + writer = getConf().getOutputFormat + .getRecordWriter(fs, getConf(), outputName, Reporter.NULL) + .asInstanceOf[RecordWriter[K, V]] + + require(writer != null, "Unable to obtain RecordWriter") + } + + def write(pair: (K, V)): Unit = { + require(writer != null, "Must call createWriter before write.") + writer.write(pair._1, pair._2) + } + + def closeWriter(taskContext: NewTaskAttemptContext): Unit = { + if (writer != null) { + writer.close(Reporter.NULL) + } + } + + // -------------------------------------------------------------------------- + // Create OutputFormat + // -------------------------------------------------------------------------- + + def initOutputFormat(jobContext: NewJobContext): Unit = { + if (outputFormat == null) { + outputFormat = getConf().getOutputFormat.getClass + .asInstanceOf[Class[_ <: OutputFormat[K, V]]] + } + } + + private def getOutputFormat(): OutputFormat[K, V] = { + require(outputFormat != null, "Must call initOutputFormat first.") + + outputFormat.newInstance() + } + + // -------------------------------------------------------------------------- + // Verify hadoop config + // -------------------------------------------------------------------------- + + def assertConf(): Unit = { + val outputFormatInstance = getOutputFormat() + val keyClass = getConf().getOutputKeyClass + val valueClass = getConf().getOutputValueClass + if (outputFormatInstance == null) { + throw new SparkException("Output format class not set") + } + if (keyClass == null) { + throw new SparkException("Output key class not set") + } + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + SparkHadoopUtil.get.addCredentials(getConf()) + + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + + valueClass.getSimpleName + ")") + } + + def checkOutputSpecs(jobContext: NewJobContext): Unit = { + val ignoredFs = FileSystem.get(getConf()) + getOutputFormat().checkOutputSpecs(ignoredFs, getConf()) + } + +} + +/** + * A helper class that reads Configuration from newer mapreduce API, creates output + * Format/Committer/Writer. + */ +private[spark] +class HadoopMapReduceWriteConfigUtil[K, V: ClassTag](conf: SerializableConfiguration) + extends HadoopWriteConfigUtil[K, V] with Logging { + + private var outputFormat: Class[_ <: NewOutputFormat[K, V]] = null + private var writer: NewRecordWriter[K, V] = null + + private def getConf(): Configuration = conf.value + + // -------------------------------------------------------------------------- + // Create JobContext/TaskAttemptContext + // -------------------------------------------------------------------------- + + def createJobContext(jobTrackerId: String, jobId: Int): NewJobContext = { + val jobAttemptId = new NewTaskAttemptID(jobTrackerId, jobId, TaskType.MAP, 0, 0) + new NewTaskAttemptContextImpl(getConf(), jobAttemptId) + } + + def createTaskAttemptContext( + jobTrackerId: String, + jobId: Int, + splitId: Int, + taskAttemptId: Int): NewTaskAttemptContext = { + val attemptId = new NewTaskAttemptID( + jobTrackerId, jobId, TaskType.REDUCE, splitId, taskAttemptId) + new NewTaskAttemptContextImpl(getConf(), attemptId) + } + + // -------------------------------------------------------------------------- + // Create committer + // -------------------------------------------------------------------------- + + def createCommitter(jobId: Int): HadoopMapReduceCommitProtocol = { + FileCommitProtocol.instantiate( + className = classOf[HadoopMapReduceCommitProtocol].getName, + jobId = jobId.toString, + outputPath = getConf().get("mapred.output.dir"), + isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] + } + + // -------------------------------------------------------------------------- + // Create writer + // -------------------------------------------------------------------------- + + def initWriter(taskContext: NewTaskAttemptContext, splitId: Int): Unit = { + val taskFormat = getOutputFormat() + // If OutputFormat is Configurable, we should set conf to it. + taskFormat match { + case c: Configurable => c.setConf(getConf()) + case _ => () + } + + writer = taskFormat.getRecordWriter(taskContext) + .asInstanceOf[NewRecordWriter[K, V]] + + require(writer != null, "Unable to obtain RecordWriter") + } + + def write(pair: (K, V)): Unit = { + require(writer != null, "Must call createWriter before write.") + writer.write(pair._1, pair._2) + } + + def closeWriter(taskContext: NewTaskAttemptContext): Unit = { + if (writer != null) { + writer.close(taskContext) + } + } + + // -------------------------------------------------------------------------- + // Create OutputFormat + // -------------------------------------------------------------------------- + + def initOutputFormat(jobContext: NewJobContext): Unit = { + if (outputFormat == null) { + outputFormat = jobContext.getOutputFormatClass + .asInstanceOf[Class[_ <: NewOutputFormat[K, V]]] + } + } + + private def getOutputFormat(): NewOutputFormat[K, V] = { + require(outputFormat != null, "Must call initOutputFormat first.") + + outputFormat.newInstance() + } + + // -------------------------------------------------------------------------- + // Verify hadoop config + // -------------------------------------------------------------------------- + + def assertConf(): Unit = { + // Do nothing for mapreduce API. + } + + def checkOutputSpecs(jobContext: NewJobContext): Unit = { + getOutputFormat().checkOutputSpecs(jobContext) + } + +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala new file mode 100644 index 0000000000000..06e6d3acea601 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.text.SimpleDateFormat +import java.util.{Date, Locale} + +import scala.util.DynamicVariable + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.{JobConf, JobID} + +import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.OutputMetrics + +/** + * A helper object that provide common utils used during saving an RDD using a Hadoop OutputFormat + * (both from the old mapred API and the new mapreduce API) + */ +private[spark] +object SparkHadoopWriterUtils { + + private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + + /** + * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case + * basis; see SPARK-4835 for more details. + */ + val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) + + def createJobID(time: Date, id: Int): JobID = { + val jobtrackerID = createJobTrackerID(time) + new JobID(jobtrackerID, id) + } + + def createJobTrackerID(time: Date): String = { + new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time) + } + + def createPathFromString(path: String, conf: JobConf): Path = { + if (path == null) { + throw new IllegalArgumentException("Output path is null") + } + val outputPath = new Path(path) + val fs = outputPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException("Incorrectly formatted output path") + } + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + + // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation + // setting can take effect: + def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = { + val validationDisabled = disableOutputSpecValidation.value + val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true) + enabledInConf && !validationDisabled + } + + def isSpeculationEnabled(conf: SparkConf): Boolean = { + conf.getBoolean("spark.speculation", false) + } + + // TODO: these don't seem like the right abstractions. + // We should abstract the duplicate code in a less awkward way. + + // return type: (output metrics, bytes written callback), defined only if the latter is defined + def initHadoopOutputMetrics( + context: TaskContext): Option[(OutputMetrics, () => Long)] = { + val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() + bytesWrittenCallback.map { b => + (context.taskMetrics().outputMetrics, b) + } + } + + def maybeUpdateOutputMetrics( + outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], + recordsWritten: Long): Unit = { + if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { + outputMetricsAndBytesWrittenCallback.foreach { + case (om, callback) => + om.setBytesWritten(callback()) + om.setRecordsWritten(recordsWritten) + } + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 33e695ec5322b..1f9eb7404dedb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -27,7 +27,6 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} @@ -36,13 +35,11 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.OutputMetrics -import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils} +import org.apache.spark.internal.io._ import org.apache.spark.internal.Logging import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.Utils +import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.util.random.StratifiedSamplingUtils @@ -1016,11 +1013,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. - * - * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do - * not use output committer that writes data directly. - * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad - * result of using direct output committer with speculation enabled. */ def saveAsHadoopFile( path: String, @@ -1047,19 +1039,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = self.conf.getBoolean("spark.speculation", false) - val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use an output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } - FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriterUtils.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) @@ -1070,16 +1049,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Configuration object for that storage system. The Conf should set an OutputFormat and any * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. - * - * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do - * not use output committer that writes data directly. - * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad - * result of using direct output committer with speculation enabled. */ def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { - SparkHadoopMapReduceWriter.write( + val config = new HadoopMapReduceWriteConfigUtil[K, V](new SerializableConfiguration(conf)) + SparkHadoopWriter.write( rdd = self, - hadoopConf = conf) + config = config) } /** @@ -1089,66 +1064,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - val outputFormatInstance = hadoopConf.getOutputFormat - val keyClass = hadoopConf.getOutputKeyClass - val valueClass = hadoopConf.getOutputValueClass - if (outputFormatInstance == null) { - throw new SparkException("Output format class not set") - } - if (keyClass == null) { - throw new SparkException("Output key class not set") - } - if (valueClass == null) { - throw new SparkException("Output value class not set") - } - SparkHadoopUtil.get.addCredentials(hadoopConf) - - logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + - valueClass.getSimpleName + ")") - - if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) { - // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(hadoopConf) - hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) - } - - val writer = new SparkHadoopWriter(hadoopConf) - writer.preSetup() - - val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt - - val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = - SparkHadoopWriterUtils.initHadoopOutputMetrics(context) - - writer.setup(context.stageId, context.partitionId, taskAttemptId) - writer.open() - var recordsWritten = 0L - - Utils.tryWithSafeFinallyAndFailureCallbacks { - while (iter.hasNext) { - val record = iter.next() - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) - - // Update bytes written metric every few records - SparkHadoopWriterUtils.maybeUpdateOutputMetrics( - outputMetricsAndBytesWrittenCallback, recordsWritten) - recordsWritten += 1 - } - }(finallyBlock = writer.close()) - writer.commit() - outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => - om.setBytesWritten(callback()) - om.setRecordsWritten(recordsWritten) - } - } - - self.context.runJob(self, writeToFile) - writer.commitJob() + val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf)) + SparkHadoopWriter.write( + rdd = self, + config = config) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 02df157be377c..44dd955ce8690 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -561,7 +561,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsHadoopFile( "ignored", pairs.keyClass, pairs.valueClass, classOf[FakeFormatWithCallback], conf) } - assert(e.getMessage contains "failed to write") + assert(e.getCause.getMessage contains "failed to write") assert(FakeWriterWithCallback.calledBy === "write,callback,close") assert(FakeWriterWithCallback.exception != null, "exception should be captured") diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 83288db92bb43..dcce5a6c0369a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -18,12 +18,14 @@ package org.apache.spark.scheduler import java.io.File +import java.util.Date import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.postfixOps -import org.apache.hadoop.mapred.{JobConf, OutputCommitter, TaskAttemptContext, TaskAttemptID} +import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapreduce.TaskType import org.mockito.Matchers import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -31,6 +33,7 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.apache.spark._ +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapRedCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.rdd.{FakeOutputCommitter, RDD} import org.apache.spark.util.{ThreadUtils, Utils} @@ -197,6 +200,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { */ private case class OutputCommitFunctions(tempDirPath: String) { + private val jobId = new SerializableWritable(SparkHadoopWriterUtils.createJobID(new Date, 0)) + // Mock output committer that simulates a successful commit (after commit is authorized) private def successfulOutputCommitter = new FakeOutputCommitter { override def commitTask(context: TaskAttemptContext): Unit = { @@ -229,14 +234,21 @@ private case class OutputCommitFunctions(tempDirPath: String) { def jobConf = new JobConf { override def getOutputCommitter(): OutputCommitter = outputCommitter } - val sparkHadoopWriter = new SparkHadoopWriter(jobConf) { - override def newTaskAttemptContext( - conf: JobConf, - attemptId: TaskAttemptID): TaskAttemptContext = { - mock(classOf[TaskAttemptContext]) - } - } - sparkHadoopWriter.setup(ctx.stageId, ctx.partitionId, ctx.attemptNumber) - sparkHadoopWriter.commit() + + // Instantiate committer. + val committer = FileCommitProtocol.instantiate( + className = classOf[HadoopMapRedCommitProtocol].getName, + jobId = jobId.value.getId.toString, + outputPath = jobConf.get("mapred.output.dir"), + isAppend = false) + + // Create TaskAttemptContext. + val taskAttemptId = (ctx.taskAttemptId % Int.MaxValue).toInt + val attemptId = new TaskAttemptID( + new TaskID(jobId.value, TaskType.MAP, ctx.partitionId), taskAttemptId) + val taskContext = new TaskAttemptContextImpl(jobConf, attemptId) + + committer.setupTask(taskContext) + committer.commitTask(taskContext) } }