diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 000000000000..ca9f7219de73 --- /dev/null +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,7 @@ +package org.apache.hadoop.mapred + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) +} diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 000000000000..de7b0f81e380 --- /dev/null +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,9 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 000000000000..35300cea5883 --- /dev/null +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,7 @@ +package org.apache.hadoop.mapred + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 000000000000..7afdbff3205c --- /dev/null +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,10 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration +import task.{TaskAttemptContextImpl, JobContextImpl} + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) +} diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 790603581fe6..59ff8e815695 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -20,7 +20,7 @@ import spark.SerializableWritable * contain an output key class, an output value class, a filename to write to, etc exactly like in * a Hadoop job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { +class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -126,14 +126,14 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl private def getJobContext(): JobContext = { if (jobContext == null) { - jobContext = new JobContext(conf.value, jID.value) + jobContext = newJobContext(conf.value, jID.value) } return jobContext } private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { - taskContext = new TaskAttemptContext(conf.value, taID.value) + taskContext = newTaskAttemptContext(conf.value, taID.value) } return taskContext } diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index cd42586aa6b6..fd1d84d0ab71 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -2,13 +2,7 @@ package spark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce.InputFormat -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.JobID -import org.apache.hadoop.mapreduce.RecordReader -import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.mapreduce.TaskAttemptID +import org.apache.hadoop.mapreduce._ import java.util.Date import java.text.SimpleDateFormat @@ -26,7 +20,8 @@ class NewHadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient conf: Configuration) - extends RDD[(K, V)](sc) { + extends RDD[(K, V)](sc) + with HadoopMapReduceUtil { private val serializableConf = new SerializableWritable(conf) @@ -41,7 +36,7 @@ class NewHadoopRDD[K, V]( @transient private val splits_ : Array[Split] = { val inputFormat = inputFormatClass.newInstance - val jobContext = new JobContext(serializableConf.value, jobId) + val jobContext = newJobContext(serializableConf.value, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Split](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -56,7 +51,7 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopSplit] val conf = serializableConf.value val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) - val context = new TaskAttemptContext(serializableConf.value, attemptId) + val context = newTaskAttemptContext(serializableConf.value, attemptId) val format = inputFormatClass.newInstance val reader = format.createRecordReader(split.serializableHadoopSplit.value, context) reader.initialize(split.serializableHadoopSplit.value, context) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 63af78b6628d..36fc2a23ef99 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -29,11 +29,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} -import org.apache.hadoop.mapreduce.TaskAttemptID -import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext} import SparkContext._ @@ -43,6 +39,7 @@ import SparkContext._ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging + with HadoopMapReduceUtil with Serializable { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { @@ -298,7 +295,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /* "reduce task" */ val attemptId = new TaskAttemptID(jobtrackerID, stageId, false, context.splitId, context.attemptId) - val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) @@ -317,7 +314,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * setupJob/commitJob, so we just use a dummy "map" task. */ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) - val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) val count = self.context.runJob(self, writeShard _).sum diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index afd9a118bb8b..f482e2b99703 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -8,6 +8,11 @@ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop. val HADOOP_VERSION = "0.20.205.0" + val HADOOP_MAJOR_VERSION = "1" + + // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" + //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" + //val HADOOP_MAJOR_VERSION = "2" lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel) @@ -21,7 +26,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", - version := "0.5.2-SNAPSHOT", + version := "0.5.3-SNAPSHOT", scalaVersion := "2.9.2", scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, @@ -107,7 +112,8 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.2", "colt" % "colt" % "1.2.0", "org.apache.mesos" % "mesos" % "0.9.0-incubating" - ) + ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, + unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } ) ++ assemblySettings ++ extraAssemblySettings ++ Seq(test in assembly := {}) def rootSettings = sharedSettings ++ Seq( diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 0dc2176a282a..bc995a7f917d 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.5.2-SNAPSHOT + /___/ .__/\_,_/_/ /_/\_\ version 0.5.3-SNAPSHOT /_/ """) import Properties._