Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cumulative sum to KeyedList #1085

Merged
merged 1 commit into from
Nov 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.twitter.scalding.typed

import com.twitter.algebird._

/**
* Extension for TypedPipe to add a cumulativeSum method.
* Given a TypedPipe with T = (GroupField, (SortField, SummableField))
* cumulaitiveSum will return a SortedGrouped with the SummableField accumulated
* according to the sort field.
* eg:
* ('San Francisco', (100, 100)),
* ('San Francisco', (101, 50)),
* ('San Francisco', (200, 200)),
* ('Vancouver', (100, 50)),
* ('Vancouver', (101, 300)),
* ('Vancouver', (200, 100))
* becomes
* ('San Francisco', (100, 100)),
* ('San Francisco', (101, 150)),
* ('San Francisco', (200, 300)),
* ('Vancouver', (100, 50)),
* ('Vancouver', (101, 350)),
* ('Vancouver', (200, 450))
*
* If you provide cumulativeSum a partition function you get the same result
* but you allow for more than one reducer per group. This is useful for
* when you have a single group that has a very large number of entries.
* For example in the previous example if you gave a partition function of the
* form { _ / 100 } then you would never have any one reducer deal with more
* than 2 entries.
*/
object CumulativeSum {
implicit def toCumulativeSum[K, U, V](pipe: TypedPipe[(K, (U, V))]) =
new CumulativeSumExtension(pipe)

class CumulativeSumExtension[K, U, V](
val pipe: TypedPipe[(K, (U, V))]) {
/** Takes a sortable field and a monoid and returns the cumulative sum of that monoid **/
def cumulativeSum(
implicit sg: Semigroup[V], ordU: Ordering[U], ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
pipe.group
.sortBy { case (u: U, _) => u }
.scanLeft(Nil: List[(U, V)]) {
case (acc, (u: U, v: V)) =>
acc match {
case List((previousU, previousSum)) => List((u, sg.plus(previousSum, v)))
case _ => List((u, v))
}
}
.flattenValues
}
/**
* An optimization of cumulativeSum for cases when a particular key has many
* entries. Requires a sortable partitioning of U.
* Accomplishes the optimization by not requiring all the entries for a
* single key to go through a single scan. Instead requires the sums of the
* partitions for a single key to go through a single scan.
*/
def cumulativeSum[S](partition: U => S)(
implicit ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {

val sumPerS = pipe
.map { case (k, (u: U, v: V)) => (k, partition(u)) -> v }
.sumByKey
.map { case ((k, s), v) => (k, (s, v)) }
.group
.sortBy { case (s, v) => s }
.scanLeft(None: Option[(Option[V], V, S)]) {
case (acc, (s, v)) =>
acc match {
case Some((previousPreviousSum, previousSum, previousS)) => {
Some((Some(previousSum), sg.plus(v, previousSum), s))
}
case _ => Some((None, v, s))
}
}
.flatMap{
case (k, maybeAcc) =>
for (
acc <- maybeAcc;
previousSum <- acc._1
) yield { (k, acc._3) -> (None, previousSum) }
}

val summands = pipe
.map {
case (k, (u: U, v: V)) =>
(k, partition(u)) -> (Some(u), v)
} ++ sumPerS

summands
.group
.sortBy { case (u, _) => u }
.scanLeft(None: Option[(Option[U], V)]) {
case (acc, (maybeU, v)) =>
acc match {
case Some((_, previousSum)) => Some((maybeU, sg.plus(v, previousSum)))
case _ => Some((maybeU, v))
}
}
.flatMap {
case ((k, s), acc) =>
for (uv <- acc; u <- uv._1) yield {
(k, (u, uv._2))
}
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.twitter.scalding

import org.specs._
import com.twitter.scalding._

import com.twitter.scalding.typed.CumulativeSum._

class AddRankingWithCumulativeSum(args: Args) extends Job(args) {
TypedPipe.from(TypedTsv[(String, Double)]("input1"))
.map {
case (gender, height) =>
(gender, (height, 1L))
}
.cumulativeSum
.map {
case (gender, (height, rank)) =>
(gender, height, rank)
}
.write(TypedTsv("result1"))
}

class AddRankingWithPartitionedCumulativeSum(args: Args) extends Job(args) {
TypedPipe.from(TypedTsv[(String, Double)]("input1"))
.map {
case (gender, height) =>
(gender, (height, 1L))
}
.cumulativeSum { h => (h / 100).floor.toLong }
.map {
case (gender, (height, rank)) =>
(gender, height, rank)
}
.write(TypedTsv("result1"))
}

class CumulativeSumTest1 extends Specification {

// --- A simple ranking job
val sampleInput1 = List(
("male", "165.2"),
("female", "172.2"),
("male", "184.1"),
("male", "125.4"),
("female", "128.6"),
("male", "265.2"),
("female", "272.2"),
("male", "284.1"),
("male", "225.4"),
("female", "228.6"))

// Each group sorted and ranking added highest person to shortest
val expectedOutput1 = Set(
("male", 184.1, 3),
("male", 165.2, 2),
("male", 125.4, 1),
("female", 172.2, 2),
("female", 128.6, 1),
("male", 284.1, 6),
("male", 265.2, 5),
("male", 225.4, 4),
("female", 272.2, 4),
("female", 228.6, 3))

"A simple ranking cumulative sum job" should {
JobTest("com.twitter.scalding.AddRankingWithCumulativeSum")
.source(TypedTsv[(String, Double)]("input1"), sampleInput1)
.sink[(String, Double, Long)](TypedTsv[(String, Double, Long)]("result1")) { outBuf1 =>
"produce correct number of records when filtering out null values" in {
outBuf1.size must_== 10
}
"create correct ranking per group, 1st being the heighest person of that group" in {
outBuf1.toSet must_== expectedOutput1
}
}
.run
.finish
}

"A partitioned ranking cumulative sum job" should {
JobTest("com.twitter.scalding.AddRankingWithPartitionedCumulativeSum")
.source(TypedTsv[(String, Double)]("input1"), sampleInput1)
.sink[(String, Double, Long)](TypedTsv[(String, Double, Long)]("result1")) { outBuf1 =>
"produce correct number of records when filtering out null values" in {
outBuf1.size must_== 10
}
"create correct ranking per group, 1st being the heighest person of that group" in {
outBuf1.toSet must_== expectedOutput1
}
}
.run
.finish
}
}