diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2301caafb07f..79b90be5d80d 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -294,3 +294,78 @@ private object Accumulators { def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue) def stringifyValue(value: Any) = "%s".format(value) } + +private[spark] trait AccumulatorParamImplicits { + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double) = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int) = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long) = t1 + t2 + def zero(initialValue: Long) = 0L + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float) = t1 + t2 + def zero(initialValue: Float) = 0f + } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings +} + + +private[spark] trait AccumulatorConstructors { + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `+=` method. Only the driver can access the accumulator's `value`. + */ + def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = + new Accumulator(initialValue, param) + + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display + * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the + * driver can access the accumulator's `value`. + */ + def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = { + new Accumulator(initialValue, param, Some(name)) + } + + /** + * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values + * with `+=`. Only the driver can access the accumuable's `value`. + * @tparam R accumulator result type + * @tparam T type that can be added to the accumulator + */ + def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = + new Accumulable(initialValue, param) + + /** + * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the + * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can + * access the accumuable's `value`. + * @tparam R accumulator result type + * @tparam T type that can be added to the accumulator + */ + def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = + new Accumulable(initialValue, param, Some(name)) + + /** + * Create an accumulator from a "mutable collection" type. + * + * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by + * standard mutable collections. So you can use this with mutable Map, Set, etc. + */ + def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] + (initialValue: R): Accumulable[R, T] = { + val param = new GrowableAccumulableParam[R,T] + new Accumulable(initialValue, param) + } +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8b4db783979e..cf0ff23abe07 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -61,7 +61,9 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { +class SparkContext(config: SparkConf) extends Logging + with AccumulatorConstructors + with SparkStatusAPI { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -836,53 +838,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // Methods for creating shared variables - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" - * values to using the `+=` method. Only the driver can access the accumulator's `value`. - */ - def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = - new Accumulator(initialValue, param) - - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display - * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the - * driver can access the accumulator's `value`. - */ - def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = { - new Accumulator(initialValue, param, Some(name)) - } - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values - * with `+=`. Only the driver can access the accumuable's `value`. - * @tparam R accumulator result type - * @tparam T type that can be added to the accumulator - */ - def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = - new Accumulable(initialValue, param) - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the - * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can - * access the accumuable's `value`. - * @tparam R accumulator result type - * @tparam T type that can be added to the accumulator - */ - def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = - new Accumulable(initialValue, param, Some(name)) - - /** - * Create an accumulator from a "mutable collection" type. - * - * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by - * standard mutable collections. So you can use this with mutable Map, Set, etc. - */ - def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] - (initialValue: R): Accumulable[R, T] = { - val param = new GrowableAccumulableParam[R,T] - new Accumulable(initialValue, param) - } - /** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. @@ -1386,7 +1341,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { * The SparkContext object contains a number of implicit conversions and parameters for use with * various Spark features. */ -object SparkContext extends Logging { +object SparkContext extends Logging with AccumulatorParamImplicits { private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" @@ -1398,28 +1353,6 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "" - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def addInPlace(t1: Double, t2: Double): Double = t1 + t2 - def zero(initialValue: Double) = 0.0 - } - - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { - def addInPlace(t1: Int, t2: Int): Int = t1 + t2 - def zero(initialValue: Int) = 0 - } - - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { - def addInPlace(t1: Long, t2: Long) = t1 + t2 - def zero(initialValue: Long) = 0L - } - - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { - def addInPlace(t1: Float, t2: Float) = t1 + t2 - def zero(initialValue: Float) = 0f - } - - // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd)