Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)._1
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}
Expand Down Expand Up @@ -295,15 +295,62 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)._1
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
* ::Experimental::
* Return random, non-overlapping splits of this RDD sampled by key (via stratified sampling)
* with each split containing exactly math.ceil(numItems * samplingRate) for each stratum.
*
* This method differs from [[sampleByKey]] and [[sampleByKeyExact]] in that it provides random
* splits (and their complements) instead of just a subsample of the data. This requires
* segmenting random keys into ranges with upper and lower bounds instead of segmenting the keys
* into a high/low bisection of the entire dataset.
*
* @param weights array of maps of (key -> samplingRate) pairs for each split, normed by key
* @param exact boolean specifying whether to use exact subsampling
* @param seed seed for the random number generator
* @return array of tuples containing the subsample and complement RDDs for each split
*/
@Experimental
def randomSplitByKey(
weights: Array[Map[K, Double]],
exact: Boolean = false,
seed: Long = Utils.random.nextLong): Array[(RDD[(K, V)], RDD[(K, V)])] = self.withScope {

require(weights.flatMap(_.values).forall(v => v >= 0.0), "Negative sampling rates.")
if (weights.length > 1) {
require(weights.map(m => m.keys.toSet).sliding(2).forall(t => t(0) == t(1)),
"randomSplitByKey(): Each split must specify fractions for each key.")
}
require(weights.nonEmpty, "randomSplitByKey(): Split weights cannot be empty.")
val sumWeights = weights.foldLeft(mutable.HashMap.empty[K, Double].withDefaultValue(0.0)) {
case (acc, fractions) =>
fractions.foreach { case (k, v) => acc(k) += v }
acc
}
val normedWeights = weights.map { case fractions =>
fractions.map { case (k, v) =>
val keySum = sumWeights(k)
k -> (if (keySum > 0.0) v / keySum else 0.0)
}
}
val samplingFuncs =
StratifiedSamplingUtils.getBernoulliCellSamplingFunctions(self, normedWeights, exact, seed)

samplingFuncs.map { case (func, complementFunc) =>
(self.mapPartitionsWithIndex(func, preservesPartitioning = true),
self.mapPartitionsWithIndex(complementFunc, preservesPartitioning = true))
}.toArray
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,103 +52,133 @@ import org.apache.spark.rdd.RDD

private[spark] object StratifiedSamplingUtils extends Logging {

type StratifiedSamplingFunc[K, V] = (Int, Iterator[(K, V)]) => Iterator[(K, V)]

/**
* Count the number of items instantly accepted and generate the waitlist for each stratum.
*
* This is only invoked when exact sample size is required.
*/
def getAcceptanceResults[K, V](rdd: RDD[(K, V)],
withReplacement: Boolean,
fractions: Map[K, Double],
fractions: Seq[Map[K, Double]],
counts: Option[Map[K, Long]],
seed: Long): mutable.Map[K, AcceptanceResult] = {
seed: Long): Seq[mutable.Map[K, AcceptanceResult]] = {
val combOp = getCombOp[K]
val mappedPartitionRDD = rdd.mapPartitionsWithIndex { case (partition, iter) =>
val zeroU: mutable.Map[K, AcceptanceResult] = new mutable.HashMap[K, AcceptanceResult]()
val rng = new RandomDataGenerator()
rng.reSeed(seed + partition)
val seqOp = getSeqOp(withReplacement, fractions, rng, counts)
val zeroU: Array[mutable.Map[K, AcceptanceResult]] = Array.fill(fractions.length) {
new mutable.HashMap[K, AcceptanceResult]()
}
val rngs = Array.fill(fractions.length) {
val rng = new RandomDataGenerator()
rng.reSeed(seed + partition)
rng
}
val seqOp = getSeqOp(withReplacement, fractions, rngs, counts)
Iterator(iter.aggregate(zeroU)(seqOp, combOp))
}
mappedPartitionRDD.reduce(combOp)
}

/**
* Convenience version of [[getAcceptanceResults()]] for a single sample.
*/
def getAcceptanceResults[K, V](
rdd: RDD[(K, V)],
withReplacement: Boolean,
fractions: Map[K, Double],
counts: Option[Map[K, Long]],
seed: Long): mutable.Map[K, AcceptanceResult] = {
getAcceptanceResults(rdd, withReplacement, Seq(fractions), counts, seed).head
}

/**
* Returns the function used by aggregate to collect sampling statistics for each partition.
*/
def getSeqOp[K, V](withReplacement: Boolean,
fractions: Map[K, Double],
rng: RandomDataGenerator,
fractions: Seq[Map[K, Double]],
rngs: Array[RandomDataGenerator],
counts: Option[Map[K, Long]]):
(mutable.Map[K, AcceptanceResult], (K, V)) => mutable.Map[K, AcceptanceResult] = {
(Array[mutable.Map[K, AcceptanceResult]], (K, V)) => Array[mutable.Map[K, AcceptanceResult]] = {
val delta = 5e-5
(result: mutable.Map[K, AcceptanceResult], item: (K, V)) => {
(results: Array[mutable.Map[K, AcceptanceResult]], item: (K, V)) => {
val key = item._1
val fraction = fractions(key)
if (!result.contains(key)) {
result += (key -> new AcceptanceResult())
}
val acceptResult = result(key)

if (withReplacement) {
// compute acceptBound and waitListBound only if they haven't been computed already
// since they don't change from iteration to iteration.
// TODO change this to the streaming version
if (acceptResult.areBoundsEmpty) {
val n = counts.get(key)
val sampleSize = math.ceil(n * fraction).toLong
val lmbd1 = PoissonBounds.getLowerBound(sampleSize)
val lmbd2 = PoissonBounds.getUpperBound(sampleSize)
acceptResult.acceptBound = lmbd1 / n
acceptResult.waitListBound = (lmbd2 - lmbd1) / n
}
val acceptBound = acceptResult.acceptBound
val copiesAccepted = if (acceptBound == 0.0) 0L else rng.nextPoisson(acceptBound)
if (copiesAccepted > 0) {
acceptResult.numAccepted += copiesAccepted
var j = 0
while (j < fractions.length) {
val fraction = fractions(j)(key)
if (!results(j).contains(key)) {
results(j) += (key -> new AcceptanceResult())
}
val copiesWaitlisted = rng.nextPoisson(acceptResult.waitListBound)
if (copiesWaitlisted > 0) {
acceptResult.waitList ++= ArrayBuffer.fill(copiesWaitlisted)(rng.nextUniform())
}
} else {
// We use the streaming version of the algorithm for sampling without replacement to avoid
// using an extra pass over the RDD for computing the count.
// Hence, acceptBound and waitListBound change on every iteration.
acceptResult.acceptBound =
BinomialBounds.getLowerBound(delta, acceptResult.numItems, fraction)
acceptResult.waitListBound =
BinomialBounds.getUpperBound(delta, acceptResult.numItems, fraction)
val acceptResult = results(j)(key)

if (withReplacement) {
// compute acceptBound and waitListBound only if they haven't been computed already
// since they don't change from iteration to iteration.
// TODO change this to the streaming version
if (acceptResult.areBoundsEmpty) {
val n = counts.get(key)
val sampleSize = math.ceil(n * fraction).toLong
val lmbd1 = PoissonBounds.getLowerBound(sampleSize)
val lmbd2 = PoissonBounds.getUpperBound(sampleSize)
acceptResult.acceptBound = lmbd1 / n
acceptResult.waitListBound = (lmbd2 - lmbd1) / n
}
val acceptBound = acceptResult.acceptBound
val copiesAccepted = if (acceptBound == 0.0) 0L else rngs(j).nextPoisson(acceptBound)
if (copiesAccepted > 0) {
acceptResult.numAccepted += copiesAccepted
}
val copiesWaitlisted = rngs(j).nextPoisson(acceptResult.waitListBound)
if (copiesWaitlisted > 0) {
acceptResult.waitList ++= ArrayBuffer.fill(copiesWaitlisted)(rngs(j).nextUniform())
}
} else {
// We use the streaming version of the algorithm for sampling without replacement to avoid
// using an extra pass over the RDD for computing the count.
// Hence, acceptBound and waitListBound change on every iteration.
acceptResult.acceptBound =
BinomialBounds.getLowerBound(delta, acceptResult.numItems, fraction)
acceptResult.waitListBound =
BinomialBounds.getUpperBound(delta, acceptResult.numItems, fraction)

val x = rng.nextUniform()
if (x < acceptResult.acceptBound) {
acceptResult.numAccepted += 1
} else if (x < acceptResult.waitListBound) {
acceptResult.waitList += x
val x = rngs(j).nextUniform()
if (x < acceptResult.acceptBound) {
acceptResult.numAccepted += 1
} else if (x < acceptResult.waitListBound) {
acceptResult.waitList += x
}
}
acceptResult.numItems += 1

j += 1
}
acceptResult.numItems += 1
result
results
}
}

/**
* Returns the function used combine results returned by seqOp from different partitions.
*/
def getCombOp[K]: (mutable.Map[K, AcceptanceResult], mutable.Map[K, AcceptanceResult])
=> mutable.Map[K, AcceptanceResult] = {
(result1: mutable.Map[K, AcceptanceResult], result2: mutable.Map[K, AcceptanceResult]) => {
// take union of both key sets in case one partition doesn't contain all keys
result1.keySet.union(result2.keySet).foreach { key =>
// Use result2 to keep the combined result since r1 is usual empty
val entry1 = result1.get(key)
if (result2.contains(key)) {
result2(key).merge(entry1)
} else {
if (entry1.isDefined) {
result2 += (key -> entry1.get)
def getCombOp[K]: (Array[mutable.Map[K, AcceptanceResult]],
Array[mutable.Map[K, AcceptanceResult]]) => Array[mutable.Map[K, AcceptanceResult]] = {
(result1: Array[mutable.Map[K, AcceptanceResult]],
result2: Array[mutable.Map[K, AcceptanceResult]]) => {
var j = 0
while (j < result1.length) {
// take union of both key sets in case one partition doesn't contain all keys
result1(j).keySet.union(result2(j).keySet).foreach { key =>
// Use result2 to keep the combined result since r1 is usual empty
val entry1 = result1(j).get(key)
if (result2(j).contains(key)) {
result2(j)(key).merge(entry1)
} else {
if (entry1.isDefined) {
result2(j) += (key -> entry1.get)
}
}
}
j += 1
}
result2
}
Expand Down Expand Up @@ -188,6 +218,18 @@ private[spark] object StratifiedSamplingUtils extends Logging {
thresholdByKey
}

/**
* Convenience version of [[getBernoulliSamplingFunction()]] for a single split.
*/
def getBernoulliSamplingFunction[K: ClassTag, V: ClassTag](
rdd: RDD[(K, V)],
fractions: Map[K, Double],
exact: Boolean,
seed: Long): (StratifiedSamplingFunc[K, V], StratifiedSamplingFunc[K, V]) = {
val complementFractions = fractions.map { case (k, v) => k -> (1.0 - v) }
getBernoulliCellSamplingFunctions(rdd, Seq(fractions, complementFractions), exact, seed).head
}

/**
* Return the per partition sampling function used for sampling without replacement.
*
Expand All @@ -196,22 +238,71 @@ private[spark] object StratifiedSamplingUtils extends Logging {
*
* The sampling function has a unique seed per partition.
*/
def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
fractions: Map[K, Double],
def getBernoulliCellSamplingFunctions[K: ClassTag, V: ClassTag](
rdd: RDD[(K, V)],
fractions: Seq[Map[K, Double]],
exact: Boolean,
seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
var samplingRateByKey = fractions
if (exact) {
// determine threshold for each stratum and resample
val finalResult = getAcceptanceResults(rdd, false, fractions, None, seed)
samplingRateByKey = computeThresholdByKey(finalResult, fractions)
seed: Long): Seq[(StratifiedSamplingFunc[K, V], StratifiedSamplingFunc[K, V])] = {
val thresholds = splitFractionsToSplitPoints(fractions)
val innerThresholds = if (exact) {
val finalResults =
getAcceptanceResults(rdd, withReplacement = false, thresholds, None, seed)
finalResults.zip(thresholds).map { case (finalResult, thresh) =>
computeThresholdByKey(finalResult, thresh)
}
} else {
thresholds
}
val leftBound = fractions.head.map { case (k, v) => (k, 0.0)}
val rightBound = fractions.head.map { case (k, v) => (k, 1.0)}
val outerThresholds = leftBound +: innerThresholds :+ rightBound
outerThresholds.sliding(2).map { case Seq(lb, ub) =>
(getBernoulliCellSamplingFunction[K, V](lb, ub, seed),
getBernoulliCellSamplingFunction[K, V](lb, ub, seed, complement = true))
}.toSeq
}

/**
* Helper function to cumulative sum a sequence of Maps.
*/
private def splitFractionsToSplitPoints[K](
fractions: Seq[Map[K, Double]]): Seq[Map[K, Double]] = {
val acc = new mutable.HashMap[K, Double]()
fractions.map { case splitWeights =>
splitWeights.map { case (k, v) =>
val thisKeySum = acc.getOrElseUpdate(k, 0.0)
acc(k) += v
k -> (v + thisKeySum)
}
}.dropRight(1)
}

/**
* Return the per partition sampling function used for partitioning a dataset without
* replacement.
*
* The sampling function has a unique seed per partition.
*/
def getBernoulliCellSamplingFunction[K, V](
lb: Map[K, Double],
ub: Map[K, Double],
seed: Long,
complement: Boolean = false): StratifiedSamplingFunc[K, V] = {
(idx: Int, iter: Iterator[(K, V)]) => {
val rng = new RandomDataGenerator()
rng.reSeed(seed + idx)
// Must use the same invoke pattern on the rng as in getSeqOp for without replacement
// in order to generate the same sequence of random numbers when creating the sample
iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1))

if (complement) {
iter.filter { case (k, _) =>
val x = rng.nextUniform()
(x < lb(k)) || (x >= ub(k))
}
} else {
iter.filter { case (k, _) =>
val x = rng.nextUniform()
(x >= lb(k)) && (x < ub(k))
}
}
}
}

Expand All @@ -228,7 +319,7 @@ private[spark] object StratifiedSamplingUtils extends Logging {
def getPoissonSamplingFunction[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)],
fractions: Map[K, Double],
exact: Boolean,
seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
seed: Long): StratifiedSamplingFunc[K, V] = {
// TODO implement the streaming version of sampling w/ replacement that doesn't require counts
if (exact) {
val counts = Some(rdd.countByKey())
Expand Down
Loading