Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.hadoop.mapred

trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)

def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.hadoop.mapreduce

import org.apache.hadoop.conf.Configuration

trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)

def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.hadoop.mapred

trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)

def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.hadoop.mapreduce

import org.apache.hadoop.conf.Configuration
import task.{TaskAttemptContextImpl, JobContextImpl}

trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)

def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
}
6 changes: 3 additions & 3 deletions core/src/main/scala/spark/HadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import spark.SerializableWritable
* contain an output key class, an output value class, a filename to write to, etc exactly like in
* a Hadoop job.
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
Expand Down Expand Up @@ -126,14 +126,14 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl

private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = new JobContext(conf.value, jID.value)
jobContext = newJobContext(conf.value, jID.value)
}
return jobContext
}

private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = new TaskAttemptContext(conf.value, taID.value)
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
return taskContext
}
Expand Down
15 changes: 5 additions & 10 deletions core/src/main/scala/spark/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@ package spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.InputFormat
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.TaskAttemptID
import org.apache.hadoop.mapreduce._

import java.util.Date
import java.text.SimpleDateFormat
Expand All @@ -26,7 +20,8 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc) {
extends RDD[(K, V)](sc)
with HadoopMapReduceUtil {

private val serializableConf = new SerializableWritable(conf)

Expand All @@ -41,7 +36,7 @@ class NewHadoopRDD[K, V](
@transient
private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
val jobContext = new JobContext(serializableConf.value, jobId)
val jobContext = newJobContext(serializableConf.value, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Split](rawSplits.size)
for (i <- 0 until rawSplits.size) {
Expand All @@ -56,7 +51,7 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = serializableConf.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val context = new TaskAttemptContext(serializableConf.value, attemptId)
val context = newTaskAttemptContext(serializableConf.value, attemptId)
val format = inputFormatClass.newInstance
val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
reader.initialize(split.serializableHadoopSplit.value, context)
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.TaskAttemptID
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}

import SparkContext._

Expand All @@ -43,6 +39,7 @@ import SparkContext._
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with HadoopMapReduceUtil
with Serializable {

def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
Expand Down Expand Up @@ -298,7 +295,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = new TaskAttemptID(jobtrackerID,
stageId, false, context.splitId, context.attemptId)
val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
Expand All @@ -317,7 +314,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* setupJob/commitJob, so we just use a dummy "map" task.
*/
val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0)
val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
val count = self.context.runJob(self, writeShard _).sum
Expand Down
10 changes: 8 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
val HADOOP_VERSION = "0.20.205.0"
val HADOOP_MAJOR_VERSION = "1"

// For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
//val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
//val HADOOP_MAJOR_VERSION = "2"

lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)

Expand All @@ -21,7 +26,7 @@ object SparkBuild extends Build {

def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.5.2-SNAPSHOT",
version := "0.5.3-SNAPSHOT",
scalaVersion := "2.9.2",
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
Expand Down Expand Up @@ -107,7 +112,8 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.2",
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.9.0-incubating"
)
) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
) ++ assemblySettings ++ extraAssemblySettings ++ Seq(test in assembly := {})

def rootSettings = sharedSettings ++ Seq(
Expand Down
2 changes: 1 addition & 1 deletion repl/src/main/scala/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.5.2-SNAPSHOT
/___/ .__/\_,_/_/ /_/\_\ version 0.5.3-SNAPSHOT
/_/
""")
import Properties._
Expand Down