Skip to content

Commit 7550dcb

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into additional-parquet-filter-testcases
2 parents 9016933 + cb0e9b0 commit 7550dcb

File tree

67 files changed

+2366
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2366
-645
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,36 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
244244
}
245245
}
246246

247+
object AccumulatorParam {
248+
249+
// The following implicit objects were in SparkContext before 1.2 and users had to
250+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
251+
// them automatically. However, as there are duplicate codes in SparkContext for backward
252+
// compatibility, please update them accordingly if you modify the following implicit objects.
253+
254+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
255+
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
256+
def zero(initialValue: Double) = 0.0
257+
}
258+
259+
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
260+
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
261+
def zero(initialValue: Int) = 0
262+
}
263+
264+
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
265+
def addInPlace(t1: Long, t2: Long) = t1 + t2
266+
def zero(initialValue: Long) = 0L
267+
}
268+
269+
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
270+
def addInPlace(t1: Float, t2: Float) = t1 + t2
271+
def zero(initialValue: Float) = 0f
272+
}
273+
274+
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
275+
}
276+
247277
// TODO: The multi-thread support in accumulators is kind of lame; check
248278
// if there's a more intuitive way of doing it right
249279
private object Accumulators {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 122 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging {
8383
// contains a map from hostname to a list of input format splits on the host.
8484
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
8585

86+
val startTime = System.currentTimeMillis()
87+
8688
/**
8789
* Create a SparkContext that loads settings from system properties (for instance, when
8890
* launching with ./bin/spark-submit).
@@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging {
269271
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
270272
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
271273

272-
val startTime = System.currentTimeMillis()
273-
274274
// Add each JAR given through the constructor
275275
if (jars != null) {
276276
jars.foreach(addJar)
@@ -1624,47 +1624,74 @@ object SparkContext extends Logging {
16241624

16251625
private[spark] val DRIVER_IDENTIFIER = "<driver>"
16261626

1627-
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
1627+
// The following deprecated objects have already been copied to `object AccumulatorParam` to
1628+
// make the compiler find them automatically. They are duplicate codes only for backward
1629+
// compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
1630+
// following ones.
1631+
1632+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1633+
"backward compatibility.", "1.2.0")
1634+
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
16281635
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
16291636
def zero(initialValue: Double) = 0.0
16301637
}
16311638

1632-
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
1639+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1640+
"backward compatibility.", "1.2.0")
1641+
object IntAccumulatorParam extends AccumulatorParam[Int] {
16331642
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
16341643
def zero(initialValue: Int) = 0
16351644
}
16361645

1637-
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
1646+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1647+
"backward compatibility.", "1.2.0")
1648+
object LongAccumulatorParam extends AccumulatorParam[Long] {
16381649
def addInPlace(t1: Long, t2: Long) = t1 + t2
16391650
def zero(initialValue: Long) = 0L
16401651
}
16411652

1642-
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
1653+
@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
1654+
"backward compatibility.", "1.2.0")
1655+
object FloatAccumulatorParam extends AccumulatorParam[Float] {
16431656
def addInPlace(t1: Float, t2: Float) = t1 + t2
16441657
def zero(initialValue: Float) = 0f
16451658
}
16461659

1647-
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
1660+
// The following deprecated functions have already been moved to `object RDD` to
1661+
// make the compiler find them automatically. They are still kept here for backward compatibility
1662+
// and just call the corresponding functions in `object RDD`.
16481663

1649-
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1664+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1665+
"kept here only for backward compatibility.", "1.2.0")
1666+
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
16501667
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
1651-
new PairRDDFunctions(rdd)
1668+
RDD.rddToPairRDDFunctions(rdd)
16521669
}
16531670

1654-
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
1671+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1672+
"kept here only for backward compatibility.", "1.2.0")
1673+
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd)
16551674

1656-
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
1675+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1676+
"kept here only for backward compatibility.", "1.2.0")
1677+
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
16571678
rdd: RDD[(K, V)]) =
1658-
new SequenceFileRDDFunctions(rdd)
1679+
RDD.rddToSequenceFileRDDFunctions(rdd)
16591680

1660-
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
1681+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1682+
"kept here only for backward compatibility.", "1.2.0")
1683+
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
16611684
rdd: RDD[(K, V)]) =
1662-
new OrderedRDDFunctions[K, V, (K, V)](rdd)
1685+
RDD.rddToOrderedRDDFunctions(rdd)
16631686

1664-
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
1687+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1688+
"kept here only for backward compatibility.", "1.2.0")
1689+
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd)
16651690

1666-
implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1667-
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
1691+
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
1692+
"kept here only for backward compatibility.", "1.2.0")
1693+
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
1694+
RDD.numericRDDToDoubleRDDFunctions(rdd)
16681695

16691696
// Implicit conversions to common Writable types, for saveAsSequenceFile
16701697

@@ -1690,40 +1717,49 @@ object SparkContext extends Logging {
16901717
arr.map(x => anyToWritable(x)).toArray)
16911718
}
16921719

1693-
// Helper objects for converting common types to Writable
1694-
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
1695-
: WritableConverter[T] = {
1696-
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
1697-
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
1698-
}
1720+
// The following deprecated functions have already been moved to `object WritableConverter` to
1721+
// make the compiler find them automatically. They are still kept here for backward compatibility
1722+
// and just call the corresponding functions in `object WritableConverter`.
16991723

1700-
implicit def intWritableConverter(): WritableConverter[Int] =
1701-
simpleWritableConverter[Int, IntWritable](_.get)
1724+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1725+
"backward compatibility.", "1.2.0")
1726+
def intWritableConverter(): WritableConverter[Int] =
1727+
WritableConverter.intWritableConverter()
17021728

1703-
implicit def longWritableConverter(): WritableConverter[Long] =
1704-
simpleWritableConverter[Long, LongWritable](_.get)
1729+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1730+
"backward compatibility.", "1.2.0")
1731+
def longWritableConverter(): WritableConverter[Long] =
1732+
WritableConverter.longWritableConverter()
17051733

1706-
implicit def doubleWritableConverter(): WritableConverter[Double] =
1707-
simpleWritableConverter[Double, DoubleWritable](_.get)
1734+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1735+
"backward compatibility.", "1.2.0")
1736+
def doubleWritableConverter(): WritableConverter[Double] =
1737+
WritableConverter.doubleWritableConverter()
17081738

1709-
implicit def floatWritableConverter(): WritableConverter[Float] =
1710-
simpleWritableConverter[Float, FloatWritable](_.get)
1739+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1740+
"backward compatibility.", "1.2.0")
1741+
def floatWritableConverter(): WritableConverter[Float] =
1742+
WritableConverter.floatWritableConverter()
17111743

1712-
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
1713-
simpleWritableConverter[Boolean, BooleanWritable](_.get)
1744+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1745+
"backward compatibility.", "1.2.0")
1746+
def booleanWritableConverter(): WritableConverter[Boolean] =
1747+
WritableConverter.booleanWritableConverter()
17141748

1715-
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1716-
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
1717-
// getBytes method returns array which is longer then data to be returned
1718-
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
1719-
)
1720-
}
1749+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1750+
"backward compatibility.", "1.2.0")
1751+
def bytesWritableConverter(): WritableConverter[Array[Byte]] =
1752+
WritableConverter.bytesWritableConverter()
17211753

1722-
implicit def stringWritableConverter(): WritableConverter[String] =
1723-
simpleWritableConverter[String, Text](_.toString)
1754+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1755+
"backward compatibility.", "1.2.0")
1756+
def stringWritableConverter(): WritableConverter[String] =
1757+
WritableConverter.stringWritableConverter()
17241758

1725-
implicit def writableWritableConverter[T <: Writable]() =
1726-
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
1759+
@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
1760+
"backward compatibility.", "1.2.0")
1761+
def writableWritableConverter[T <: Writable]() =
1762+
WritableConverter.writableWritableConverter()
17271763

17281764
/**
17291765
* Find the JAR from which a given class was loaded, to make it easy for users to pass
@@ -1950,3 +1986,46 @@ private[spark] class WritableConverter[T](
19501986
val writableClass: ClassTag[T] => Class[_ <: Writable],
19511987
val convert: Writable => T)
19521988
extends Serializable
1989+
1990+
object WritableConverter {
1991+
1992+
// Helper objects for converting common types to Writable
1993+
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
1994+
: WritableConverter[T] = {
1995+
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
1996+
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
1997+
}
1998+
1999+
// The following implicit functions were in SparkContext before 1.2 and users had to
2000+
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
2001+
// them automatically. However, we still keep the old functions in SparkContext for backward
2002+
// compatibility and forward to the following functions directly.
2003+
2004+
implicit def intWritableConverter(): WritableConverter[Int] =
2005+
simpleWritableConverter[Int, IntWritable](_.get)
2006+
2007+
implicit def longWritableConverter(): WritableConverter[Long] =
2008+
simpleWritableConverter[Long, LongWritable](_.get)
2009+
2010+
implicit def doubleWritableConverter(): WritableConverter[Double] =
2011+
simpleWritableConverter[Double, DoubleWritable](_.get)
2012+
2013+
implicit def floatWritableConverter(): WritableConverter[Float] =
2014+
simpleWritableConverter[Float, FloatWritable](_.get)
2015+
2016+
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
2017+
simpleWritableConverter[Boolean, BooleanWritable](_.get)
2018+
2019+
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
2020+
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
2021+
// getBytes method returns array which is longer then data to be returned
2022+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
2023+
)
2024+
}
2025+
2026+
implicit def stringWritableConverter(): WritableConverter[String] =
2027+
simpleWritableConverter[String, Text](_.toString)
2028+
2029+
implicit def writableWritableConverter[T <: Writable]() =
2030+
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
2031+
}

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
3232

3333
import org.apache.spark.{HashPartitioner, Partitioner}
3434
import org.apache.spark.Partitioner._
35-
import org.apache.spark.SparkContext.rddToPairRDDFunctions
3635
import org.apache.spark.annotation.Experimental
3736
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3837
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
3938
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
4039
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4140
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
41+
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
4242
import org.apache.spark.storage.StorageLevel
4343
import org.apache.spark.util.Utils
4444

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
3333
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3434

3535
import org.apache.spark._
36-
import org.apache.spark.SparkContext._
36+
import org.apache.spark.AccumulatorParam._
3737
import org.apache.spark.annotation.Experimental
3838
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3939
import org.apache.spark.broadcast.Broadcast

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark.deploy.master
1919

2020
import java.io._
21-
import java.nio.ByteBuffer
21+
22+
import scala.reflect.ClassTag
23+
24+
import akka.serialization.Serialization
2225

2326
import org.apache.spark.Logging
24-
import org.apache.spark.serializer.Serializer
2527

26-
import scala.reflect.ClassTag
2728

2829
/**
2930
* Stores data in a single on-disk directory with one file per application and worker.
@@ -34,10 +35,9 @@ import scala.reflect.ClassTag
3435
*/
3536
private[spark] class FileSystemPersistenceEngine(
3637
val dir: String,
37-
val serialization: Serializer)
38+
val serialization: Serialization)
3839
extends PersistenceEngine with Logging {
3940

40-
val serializer = serialization.newInstance()
4141
new File(dir).mkdir()
4242

4343
override def persist(name: String, obj: Object): Unit = {
@@ -56,25 +56,27 @@ private[spark] class FileSystemPersistenceEngine(
5656
private def serializeIntoFile(file: File, value: AnyRef) {
5757
val created = file.createNewFile()
5858
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
59-
60-
val out = serializer.serializeStream(new FileOutputStream(file))
59+
val serializer = serialization.findSerializerFor(value)
60+
val serialized = serializer.toBinary(value)
61+
val out = new FileOutputStream(file)
6162
try {
62-
out.writeObject(value)
63+
out.write(serialized)
6364
} finally {
6465
out.close()
6566
}
66-
6767
}
6868

69-
def deserializeFromFile[T](file: File): T = {
69+
private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
7070
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
7171
val dis = new DataInputStream(new FileInputStream(file))
7272
try {
7373
dis.readFully(fileData)
7474
} finally {
7575
dis.close()
7676
}
77-
78-
serializer.deserializeStream(dis).readObject()
77+
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
78+
val serializer = serialization.serializerFor(clazz)
79+
serializer.fromBinary(fileData).asInstanceOf[T]
7980
}
81+
8082
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import scala.util.Random
3030
import akka.actor._
3131
import akka.pattern.ask
3232
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
33+
import akka.serialization.Serialization
3334
import akka.serialization.SerializationExtension
3435

3536
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -132,15 +133,18 @@ private[spark] class Master(
132133
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
133134
case "ZOOKEEPER" =>
134135
logInfo("Persisting recovery state to ZooKeeper")
135-
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
136+
val zkFactory =
137+
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
136138
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
137139
case "FILESYSTEM" =>
138-
val fsFactory = new FileSystemRecoveryModeFactory(conf)
140+
val fsFactory =
141+
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
139142
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
140143
case "CUSTOM" =>
141144
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
142-
val factory = clazz.getConstructor(conf.getClass)
143-
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
145+
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
146+
.newInstance(conf, SerializationExtension(context.system))
147+
.asInstanceOf[StandaloneRecoveryModeFactory]
144148
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
145149
case _ =>
146150
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))

0 commit comments

Comments
 (0)