Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 118 additions & 133 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
import org.apache.spark.annotation.Since
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal, gemm}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand All @@ -31,8 +31,7 @@ import org.apache.spark.util.random.XORShiftRandom

/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
* they are executed together with joint passes over the data for efficiency.
* mode (the k-means|| algorithm by Bahmani et al).
*
* This is an iterative algorithm that will make multiple passes over the data, so any RDDs given
* to it should be cached by the user.
Expand Down Expand Up @@ -227,20 +226,11 @@ class KMeans private (
private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {

val sc = data.sparkContext

val initStartTime = System.nanoTime()

// Only one run is allowed when initialModel is given
val numRuns = if (initialModel.nonEmpty) {
if (runs > 1) logWarning("Ignoring runs; one run is allowed when initialModel is given.")
1
} else {
runs
}

val centers = initialModel match {
case Some(kMeansCenters) => {
Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))
kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s))
}
case None => {
if (initializationMode == KMeans.RANDOM) {
Expand All @@ -250,211 +240,206 @@ class KMeans private (
}
}
}

val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) +
" seconds.")

val active = Array.fill(numRuns)(true)
val costs = Array.fill(numRuns)(0.0)

var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
var done = false
var costs = 0.0
var iteration = 0

val iterationStartTime = System.nanoTime()
val isSparse = data.take(1)(0).vector.isInstanceOf[SparseVector]

// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
// Execute Lloyd's algorithm until converged or reached the max number of iterations
while (iteration < maxIterations && !done) {
type WeightedPoint = (Vector, Long)
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {
axpy(1.0, x._1, y._1)
(y._1, x._2 + y._2)
}

val activeCenters = activeRuns.map(r => centers(r)).toArray
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))

val bcActiveCenters = sc.broadcast(activeCenters)
val costAccums = sc.accumulator(0.0)
val bcCenters = sc.broadcast(centers)

// Find the sum and count of points mapping to each center
val totalContribs = data.mapPartitions { points =>
val thisActiveCenters = bcActiveCenters.value
val runs = thisActiveCenters.length
val k = thisActiveCenters(0).length
val dims = thisActiveCenters(0)(0).vector.size
val thisCenters = bcCenters.value
val k = thisCenters.length
val dims = thisCenters(0).vector.size

val sums = Array.fill(k)(Vectors.zeros(dims))
val counts = Array.fill(k)(0L)

val sums = Array.fill(runs, k)(Vectors.zeros(dims))
val counts = Array.fill(runs, k)(0L)
val vectorOfPoints = new ArrayBuffer[Vector]()
val normOfPoints = new ArrayBuffer[Double]()
var numRows = 0

// Construct points matrix
points.foreach { point =>
(0 until runs).foreach { i =>
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
costAccums(i) += cost
val sum = sums(i)(bestCenter)
axpy(1.0, point.vector, sum)
counts(i)(bestCenter) += 1
vectorOfPoints.append(point.vector)
normOfPoints.append(point.norm)
numRows += 1
}

val pointMatrix = if (isSparse) {
val coo = new ArrayBuffer[(Int, Int, Double)]()
vectorOfPoints.zipWithIndex.foreach { v =>
val sv = v._1.asInstanceOf[SparseVector]
sv.indices.indices.foreach { i =>
coo.append((v._2, sv.indices(i), sv.values(i)))
}
}
SparseMatrix.fromCOO(numRows, dims, coo.toSeq)
} else {
new DenseMatrix(numRows, dims, vectorOfPoints.flatMap(_.toArray).toArray, true)
}

val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
((i, j), (sums(i)(j), counts(i)(j)))
// Construct centers matrix
val vectorOfCenters = new ArrayBuffer[Double]()
val normOfCenters = new ArrayBuffer[Double]()
thisCenters.foreach { center =>
vectorOfCenters.appendAll(center.vector.toArray)
normOfCenters.append(center.norm)
}
val centerMatrix = new DenseMatrix(dims, k, vectorOfCenters.toArray)

val a2b2 = new ArrayBuffer[Double]()
val normOfPointsArray = normOfPoints.toArray
val normOfCentersArray = normOfCenters.toArray
for (i <- 0 until k; j <- 0 until numRows) {
a2b2.append(normOfPointsArray(j) * normOfPointsArray(j) +
normOfCentersArray(i) * normOfCentersArray(i))
}

val distanceMatrix = new DenseMatrix(numRows, k, a2b2.toArray)
gemm(-2.0, pointMatrix, centerMatrix, 1.0, distanceMatrix)

val vectorOfPointsArray = vectorOfPoints.toArray
distanceMatrix.transpose.toArray.grouped(k).toArray.map(_.zipWithIndex.min).zipWithIndex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given then complexity of this line, it might be helpful to unpack the tuple and give semantic meaning to its elements. Something like:

distanceMatrix.transpose.toArray.grouped(k).toArray.map(_.zipWithIndex.min).zipWithIndex
          .foreach { case ((cost, center), i) =>
            costAccums += cost
            val sum = sums(center)
            axpy(1.0, vectorOfPointsArray(i), sum)
            counts(center) += 1
          }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Updated.

.foreach { case ((cost, bc), index) =>
costAccums += cost
val sum = sums(bc)
axpy(1.0, vectorOfPointsArray(index), sum)
counts(bc) += 1
}

val contribs = for (j <- 0 until k) yield {
(j, (sums(j), counts(j)))
}
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()

// Update the cluster centers and costs for each active run
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
var j = 0
while (j < k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
scal(1.0 / count, sum)
val newCenter = new VectorWithNorm(sum)
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
changed = true
}
centers(run)(j) = newCenter
var changed = false
var j = 0
while (j < k) {
val (sum, count) = totalContribs(j)
if (count != 0) {
scal(1.0 / count, sum)
val newCenter = new VectorWithNorm(sum)
if (KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
changed = true
}
j += 1
}
if (!changed) {
active(run) = false
logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
centers(j) = newCenter
}
costs(run) = costAccums(i).value
j += 1
}

activeRuns = activeRuns.filter(active(_))
costs = costAccums.value
if (!changed) {
logInfo("Run finished in " + (iteration + 1) + " iterations")
done = true
}
iteration += 1
}

val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.")

if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
}
logInfo(s"The cost is $costs.")

val (minCost, bestRun) = costs.zipWithIndex.min

logInfo(s"The cost for the best run is $minCost.")

new KMeansModel(centers(bestRun).map(_.vector))
new KMeansModel(centers.map(_.vector))
}

/**
* Initialize `runs` sets of cluster centers at random.
* Initialize cluster centers at random.
*/
private def initRandom(data: RDD[VectorWithNorm])
: Array[Array[VectorWithNorm]] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)
}.toArray)
private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
// Sample cluster centers in one pass
val sample = data.takeSample(true, k, new XORShiftRandom(this.seed).nextInt()).toSeq
sample.map { v => new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm) }.toArray
}

/**
* Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
* Initialize cluster centers using the k-means|| algorithm by Bahmani et al.
* (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
* to find with dissimilar cluster centers by starting with a random center and then doing
* passes where more centers are chosen with probability proportional to their squared distance
* to the current cluster set. It results in a provable approximation to an optimal clustering.
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
private def initKMeansParallel(data: RDD[VectorWithNorm])
: Array[Array[VectorWithNorm]] = {
private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
// Initialize empty centers and point costs.
val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
val centers = ArrayBuffer.empty[VectorWithNorm]
var costs = data.map(_ => Double.PositiveInfinity)

// Initialize each run's first center to a random point.
// Initialize first center to a random point.
val seed = new XORShiftRandom(this.seed).nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense))
val sample = data.takeSample(true, 1, seed)(0)
val newCenters = ArrayBuffer(sample.toDense)

/** Merges new centers to centers. */
def mergeNewCenters(): Unit = {
var r = 0
while (r < runs) {
centers(r) ++= newCenters(r)
newCenters(r).clear()
r += 1
}
centers ++= newCenters
newCenters.clear()
}

// On each step, sample 2 * k points on average for each run with probability proportional
// to their squared distance from that run's centers. Note that only distances between points
// On each step, sample 2 * k points on average with probability proportional
// to their squared distance from the centers. Note that only distances between points
// and new centers are computed in each iteration.
var step = 0
while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters)
val preCosts = costs
costs = data.zip(preCosts).map { case (point, cost) =>
Array.tabulate(runs) { r =>
math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
}
}.persist(StorageLevel.MEMORY_AND_DISK)
val sumCosts = costs
.aggregate(new Array[Double](runs))(
seqOp = (s, v) => {
// s += v
var r = 0
while (r < runs) {
s(r) += v(r)
r += 1
}
s
},
combOp = (s0, s1) => {
// s0 += s1
var r = 0
while (r < runs) {
s0(r) += s1(r)
r += 1
}
s0
}
)
math.min(KMeans.pointCost(bcNewCenters.value, point), cost)
}.persist(StorageLevel.MEMORY_AND_DISK)
val sumCosts = costs.aggregate(0.0)(_ + _, _ + _)

preCosts.unpersist(blocking = false)
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) =>
val rs = (0 until runs).filter { r =>
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
}
if (rs.length > 0) Some(p, rs) else None
val rs = rand.nextDouble() < 2.0 * c * k / sumCosts
if (rs) Some(p) else None
}
}.collect()
mergeNewCenters()
chosen.foreach { case (p, rs) =>
rs.foreach(newCenters(_) += p.toDense)
}
chosen.foreach { case p => newCenters += p.toDense }
step += 1
}

mergeNewCenters()
costs.unpersist(blocking = false)

// Finally, we might have a set of more than k candidate centers for each run; weigh each
// Finally, we might have a set of more than k candidate centers; weight each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick just k of them
val bcCenters = data.context.broadcast(centers)
val weightMap = data.flatMap { p =>
Iterator.tabulate(runs) { r =>
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
}
val weightMap = data.map { p =>
(KMeans.findClosest(bcCenters.value, p)._1, 1.0)
}.reduceByKey(_ + _).collectAsMap()
val finalCenters = (0 until runs).par.map { r =>
val myCenters = centers(r).toArray
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}

finalCenters.toArray
val myCenters = centers.toArray
val myWeights = myCenters.indices.map(i => weightMap.getOrElse(i, 0.0)).toArray
val finalCenters = LocalKMeans.kMeansPlusPlus(0, myCenters, myWeights, k, 30)

finalCenters
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ object PowerIterationClustering extends Logging {
val points = v.mapValues(x => Vectors.dense(x)).cache()
val model = new KMeans()
.setK(k)
.setRuns(5)
.setSeed(0L)
.run(points.values)
points.mapValues(p => model.predict(p)).cache()
Expand Down
Loading