diff --git a/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala b/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala new file mode 100644 index 0000000000..f0555b650e --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala @@ -0,0 +1,113 @@ +package com.twitter.scalding.typed + +import com.twitter.algebird._ + +/** + * Extension for TypedPipe to add a cumulativeSum method. + * Given a TypedPipe with T = (GroupField, (SortField, SummableField)) + * cumulaitiveSum will return a SortedGrouped with the SummableField accumulated + * according to the sort field. + * eg: + * ('San Francisco', (100, 100)), + * ('San Francisco', (101, 50)), + * ('San Francisco', (200, 200)), + * ('Vancouver', (100, 50)), + * ('Vancouver', (101, 300)), + * ('Vancouver', (200, 100)) + * becomes + * ('San Francisco', (100, 100)), + * ('San Francisco', (101, 150)), + * ('San Francisco', (200, 300)), + * ('Vancouver', (100, 50)), + * ('Vancouver', (101, 350)), + * ('Vancouver', (200, 450)) + * + * If you provide cumulativeSum a partition function you get the same result + * but you allow for more than one reducer per group. This is useful for + * when you have a single group that has a very large number of entries. + * For example in the previous example if you gave a partition function of the + * form { _ / 100 } then you would never have any one reducer deal with more + * than 2 entries. + */ +object CumulativeSum { + implicit def toCumulativeSum[K, U, V](pipe: TypedPipe[(K, (U, V))]) = + new CumulativeSumExtension(pipe) + + class CumulativeSumExtension[K, U, V]( + val pipe: TypedPipe[(K, (U, V))]) { + /** Takes a sortable field and a monoid and returns the cumulative sum of that monoid **/ + def cumulativeSum( + implicit sg: Semigroup[V], ordU: Ordering[U], ordK: Ordering[K]): SortedGrouped[K, (U, V)] = { + pipe.group + .sortBy { case (u: U, _) => u } + .scanLeft(Nil: List[(U, V)]) { + case (acc, (u: U, v: V)) => + acc match { + case List((previousU, previousSum)) => List((u, sg.plus(previousSum, v))) + case _ => List((u, v)) + } + } + .flattenValues + } + /** + * An optimization of cumulativeSum for cases when a particular key has many + * entries. Requires a sortable partitioning of U. + * Accomplishes the optimization by not requiring all the entries for a + * single key to go through a single scan. Instead requires the sums of the + * partitions for a single key to go through a single scan. + */ + def cumulativeSum[S](partition: U => S)( + implicit ordS: Ordering[S], + sg: Semigroup[V], + ordU: Ordering[U], + ordK: Ordering[K]): TypedPipe[(K, (U, V))] = { + + val sumPerS = pipe + .map { case (k, (u: U, v: V)) => (k, partition(u)) -> v } + .sumByKey + .map { case ((k, s), v) => (k, (s, v)) } + .group + .sortBy { case (s, v) => s } + .scanLeft(None: Option[(Option[V], V, S)]) { + case (acc, (s, v)) => + acc match { + case Some((previousPreviousSum, previousSum, previousS)) => { + Some((Some(previousSum), sg.plus(v, previousSum), s)) + } + case _ => Some((None, v, s)) + } + } + .flatMap{ + case (k, maybeAcc) => + for ( + acc <- maybeAcc; + previousSum <- acc._1 + ) yield { (k, acc._3) -> (None, previousSum) } + } + + val summands = pipe + .map { + case (k, (u: U, v: V)) => + (k, partition(u)) -> (Some(u), v) + } ++ sumPerS + + summands + .group + .sortBy { case (u, _) => u } + .scanLeft(None: Option[(Option[U], V)]) { + case (acc, (maybeU, v)) => + acc match { + case Some((_, previousSum)) => Some((maybeU, sg.plus(v, previousSum))) + case _ => Some((maybeU, v)) + } + } + .flatMap { + case ((k, s), acc) => + for (uv <- acc; u <- uv._1) yield { + (k, (u, uv._2)) + } + } + } + } +} + diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CumulitiveSumTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CumulitiveSumTest.scala new file mode 100644 index 0000000000..df76ab3fe9 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/CumulitiveSumTest.scala @@ -0,0 +1,93 @@ +package com.twitter.scalding + +import org.specs._ +import com.twitter.scalding._ + +import com.twitter.scalding.typed.CumulativeSum._ + +class AddRankingWithCumulativeSum(args: Args) extends Job(args) { + TypedPipe.from(TypedTsv[(String, Double)]("input1")) + .map { + case (gender, height) => + (gender, (height, 1L)) + } + .cumulativeSum + .map { + case (gender, (height, rank)) => + (gender, height, rank) + } + .write(TypedTsv("result1")) +} + +class AddRankingWithPartitionedCumulativeSum(args: Args) extends Job(args) { + TypedPipe.from(TypedTsv[(String, Double)]("input1")) + .map { + case (gender, height) => + (gender, (height, 1L)) + } + .cumulativeSum { h => (h / 100).floor.toLong } + .map { + case (gender, (height, rank)) => + (gender, height, rank) + } + .write(TypedTsv("result1")) +} + +class CumulativeSumTest1 extends Specification { + + // --- A simple ranking job + val sampleInput1 = List( + ("male", "165.2"), + ("female", "172.2"), + ("male", "184.1"), + ("male", "125.4"), + ("female", "128.6"), + ("male", "265.2"), + ("female", "272.2"), + ("male", "284.1"), + ("male", "225.4"), + ("female", "228.6")) + + // Each group sorted and ranking added highest person to shortest + val expectedOutput1 = Set( + ("male", 184.1, 3), + ("male", 165.2, 2), + ("male", 125.4, 1), + ("female", 172.2, 2), + ("female", 128.6, 1), + ("male", 284.1, 6), + ("male", 265.2, 5), + ("male", 225.4, 4), + ("female", 272.2, 4), + ("female", 228.6, 3)) + + "A simple ranking cumulative sum job" should { + JobTest("com.twitter.scalding.AddRankingWithCumulativeSum") + .source(TypedTsv[(String, Double)]("input1"), sampleInput1) + .sink[(String, Double, Long)](TypedTsv[(String, Double, Long)]("result1")) { outBuf1 => + "produce correct number of records when filtering out null values" in { + outBuf1.size must_== 10 + } + "create correct ranking per group, 1st being the heighest person of that group" in { + outBuf1.toSet must_== expectedOutput1 + } + } + .run + .finish + } + + "A partitioned ranking cumulative sum job" should { + JobTest("com.twitter.scalding.AddRankingWithPartitionedCumulativeSum") + .source(TypedTsv[(String, Double)]("input1"), sampleInput1) + .sink[(String, Double, Long)](TypedTsv[(String, Double, Long)]("result1")) { outBuf1 => + "produce correct number of records when filtering out null values" in { + outBuf1.size must_== 10 + } + "create correct ranking per group, 1st being the heighest person of that group" in { + outBuf1.toSet must_== expectedOutput1 + } + } + .run + .finish + } +}