From 03a643d312d2a39971635fd8fc8ddd0d771fbbc8 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 4 Oct 2016 11:24:03 +0100 Subject: [PATCH 1/6] Remove inactive KMeans 'runs' param; allow fewer than k centroids to be chosen too --- .../spark/mllib/clustering/KMeans.scala | 234 ++++++------------ .../spark/mllib/clustering/KMeansSuite.scala | 4 +- 2 files changed, 78 insertions(+), 160 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 23141aaf42b49..99f96180a4ad0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -43,21 +43,21 @@ import org.apache.spark.util.random.XORShiftRandom class KMeans private ( private var k: Int, private var maxIterations: Int, - private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, private var seed: Long) extends Serializable with Logging { /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random}. */ @Since("0.8.0") - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) + def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) /** - * Number of clusters to create (k). + * Number of clusters to create (k). Note that if the input has fewer than k elements, + * then it's possible that fewer than k clusters are created. */ @Since("1.4.0") def getK: Int = k @@ -112,15 +112,17 @@ class KMeans private ( * This function has no effect since Spark 2.0.0. */ @Since("1.4.0") + @deprecated("This has no effect and always returns 1", "2.1.0") def getRuns: Int = { logWarning("Getting number of runs has no effect since Spark 2.0.0.") - runs + 1 } /** * This function has no effect since Spark 2.0.0. */ @Since("0.8.0") + @deprecated("This has no effect", "2.1.0") def setRuns(runs: Int): this.type = { logWarning("Setting number of runs has no effect since Spark 2.0.0.") this @@ -239,17 +241,9 @@ class KMeans private ( val initStartTime = System.nanoTime() - // Only one run is allowed when initialModel is given - val numRuns = if (initialModel.nonEmpty) { - if (runs > 1) logWarning("Ignoring runs; one run is allowed when initialModel is given.") - 1 - } else { - runs - } - val centers = initialModel match { case Some(kMeansCenters) => - Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s))) + kMeansCenters.clusterCenters.map(new VectorWithNorm(_)) case None => if (initializationMode == KMeans.RANDOM) { initRandom(data) @@ -258,89 +252,62 @@ class KMeans private ( } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 - logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") + logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") - val active = Array.fill(numRuns)(true) - val costs = Array.fill(numRuns)(0.0) - - var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns) + var active = true + var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() - instr.foreach(_.logNumFeatures(centers(0)(0).vector.size)) - - // Execute iterations of Lloyd's algorithm until all runs have converged - while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (Vector, Long) - def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { - axpy(1.0, x._1, y._1) - (y._1, x._2 + y._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.doubleAccumulator) + instr.foreach(_.logNumFeatures(centers.head.vector.size)) - val bcActiveCenters = sc.broadcast(activeCenters) + // Execute iterations of Lloyd's algorithm until converged + while (iteration < maxIterations && active) { + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => - val thisActiveCenters = bcActiveCenters.value - val runs = thisActiveCenters.length - val k = thisActiveCenters(0).length - val dims = thisActiveCenters(0)(0).vector.size + val thisCenters = bcCenters.value + val dims = thisCenters.head.vector.size - val sums = Array.fill(runs, k)(Vectors.zeros(dims)) - val counts = Array.fill(runs, k)(0L) + val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) + val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => - (0 until runs).foreach { i => - val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) - costAccums(i).add(cost) - val sum = sums(i)(bestCenter) - axpy(1.0, point.vector, sum) - counts(i)(bestCenter) += 1 - } + val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) + costAccum.add(cost) + val sum = sums(bestCenter) + axpy(1.0, point.vector, sum) + counts(bestCenter) += 1 } - val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) - } - contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - bcActiveCenters.destroy(blocking = false) - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { - var changed = false - var j = 0 - while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { - scal(1.0 / count, sum) - val newCenter = new VectorWithNorm(sum) - if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true - } - centers(run)(j) = newCenter - } - j += 1 + counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => + axpy(1.0, sum2, sum1) + (sum1, count1 + count2) + }.collectAsMap() + + bcCenters.destroy(blocking = false) + + // Update the cluster centers and costs + active = false + totalContribs.foreach { case (j, (sum, count)) => + scal(1.0 / count, sum) + val newCenter = new VectorWithNorm(sum) + if (!active && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + active = true } - if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") - } - costs(run) = costAccums(i).value + centers(j) = newCenter } - activeRuns = activeRuns.filter(active(_)) + cost = costAccum.value iteration += 1 } val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 - logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") + logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") if (iteration == maxIterations) { logInfo(s"KMeans reached the max number of iterations: $maxIterations.") @@ -348,27 +315,21 @@ class KMeans private ( logInfo(s"KMeans converged in $iteration iterations.") } - val (minCost, bestRun) = costs.zipWithIndex.min - - logInfo(s"The cost for the best run is $minCost.") + logInfo(s"The cost is $cost.") - new KMeansModel(centers(bestRun).map(_.vector)) + new KMeansModel(centers.map(_.vector)) } /** - * Initialize `runs` sets of cluster centers at random. + * Initialize set of cluster centers at random. */ - private def initRandom(data: RDD[VectorWithNorm]) - : Array[Array[VectorWithNorm]] = { - // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => - new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm) - }.toArray) + private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = { + val sample = data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()) + sample.map(v => new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)) } /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * Initialize set of cluster centers using the k-means|| algorithm by Bahmani et al. * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries * to find with dissimilar cluster centers by starting with a random center and then doing * passes where more centers are chosen with probability proportional to their squared distance @@ -376,31 +337,22 @@ class KMeans private ( * * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. */ - private def initKMeansParallel(data: RDD[VectorWithNorm]) - : Array[Array[VectorWithNorm]] = { + private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = { // Initialize empty centers and point costs. - val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) - var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) + var costs = data.map(_ => Double.PositiveInfinity) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() - val sample = data.takeSample(true, runs, seed).toSeq + val sample = data.takeSample(false, 1, seed) // Could be empty if data is empty; fail with a better message early: - require(sample.size >= runs, s"Required $runs samples but got ${sample.size} from $data") - val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) - - /** Merges new centers to centers. */ - def mergeNewCenters(): Unit = { - var r = 0 - while (r < runs) { - centers(r) ++= newCenters(r) - newCenters(r).clear() - r += 1 - } - } + require(sample.nonEmpty, s"No samples available from $data") + + val centers = ArrayBuffer[VectorWithNorm]() + var newCenters = Seq(sample.head.toDense) + centers ++= newCenters - // On each step, sample 2 * k points on average for each run with probability proportional - // to their squared distance from that run's centers. Note that only distances between points + // On each step, sample 2 * k points on average with probability proportional + // to their squared distance from the centers. Note that only distances between points // and new centers are computed in each iteration. var step = 0 var bcNewCentersList = ArrayBuffer[Broadcast[_]]() @@ -409,74 +361,39 @@ class KMeans private ( bcNewCentersList += bcNewCenters val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Array.tabulate(runs) { r => - math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - } - }.persist(StorageLevel.MEMORY_AND_DISK) - val sumCosts = costs - .aggregate(new Array[Double](runs))( - seqOp = (s, v) => { - // s += v - var r = 0 - while (r < runs) { - s(r) += v(r) - r += 1 - } - s - }, - combOp = (s0, s1) => { - // s0 += s1 - var r = 0 - while (r < runs) { - s0(r) += s1(r) - r += 1 - } - s0 - } - ) + math.min(KMeans.pointCost(bcNewCenters.value, point), cost) + }.persist(StorageLevel.MEMORY_AND_DISK) + val sumCosts = costs.sum() bcNewCenters.unpersist(blocking = false) preCosts.unpersist(blocking = false) - val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => + val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - pointsWithCosts.flatMap { case (p, c) => - val rs = (0 until runs).filter { r => - rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) - } - if (rs.nonEmpty) Some((p, rs)) else None - } + pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1) }.collect() - mergeNewCenters() - chosen.foreach { case (p, rs) => - rs.foreach(newCenters(_) += p.toDense) - } + newCenters = chosen.map(_.toDense) + centers ++= newCenters step += 1 } - mergeNewCenters() costs.unpersist(blocking = false) bcNewCentersList.foreach(_.destroy(false)) - // Finally, we might have a set of more than k candidate centers for each run; weigh each + if (centers.size <= k) { + return centers.toArray + } + + // Finally, we might have a set of more than k candidate centers; weight each // candidate by the number of points in the dataset mapping to it and run a local k-means++ // on the weighted centers to pick just k of them val bcCenters = data.context.broadcast(centers) - val weightMap = data.flatMap { p => - Iterator.tabulate(runs) { r => - ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) - } - }.reduceByKey(_ + _).collectAsMap() + val countMap = data.map(p => KMeans.findClosest(bcCenters.value, p)._1).countByValue() bcCenters.destroy(blocking = false) - val finalCenters = (0 until runs).par.map { r => - val myCenters = centers(r).toArray - val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray - LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) - } - - finalCenters.toArray + val myWeights = centers.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray + LocalKMeans.kMeansPlusPlus(0, centers.toArray, myWeights, k, 30) } } @@ -558,6 +475,7 @@ object KMeans { * Trains a k-means model using specified parameters and the default values for unspecified. */ @Since("0.8.0") + @deprecated("Use train method without 'runs'", "2.1.0") def train( data: RDD[Vector], k: Int, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 2d35b312083c0..a207733b47ffe 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 2, maxIterations = 1) - assert(model.clusterCenters.size === 2) + assert(model.clusterCenters.size === 1) } test("more clusters than points") { @@ -87,7 +87,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 3, maxIterations = 1) - assert(model.clusterCenters.size === 3) + assert(model.clusterCenters.size === 2) } test("deterministic initialization") { From 0e501725b8ca6376a2976c323b96d126159af347 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 5 Oct 2016 08:32:03 +0100 Subject: [PATCH 2/6] Review changes --- .../spark/mllib/clustering/KMeans.scala | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 99f96180a4ad0..0f6f241d78ed1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -254,7 +254,7 @@ class KMeans private ( val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") - var active = true + var converged = false var cost = 0.0 var iteration = 0 @@ -263,7 +263,7 @@ class KMeans private ( instr.foreach(_.logNumFeatures(centers.head.vector.size)) // Execute iterations of Lloyd's algorithm until converged - while (iteration < maxIterations && active) { + while (iteration < maxIterations && !converged) { val costAccum = sc.doubleAccumulator val bcCenters = sc.broadcast(centers) @@ -292,12 +292,12 @@ class KMeans private ( bcCenters.destroy(blocking = false) // Update the cluster centers and costs - active = false + converged = true totalContribs.foreach { case (j, (sum, count)) => scal(1.0 / count, sum) val newCenter = new VectorWithNorm(sum) - if (!active && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { - active = true + if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { + converged = false } centers(j) = newCenter } @@ -321,17 +321,16 @@ class KMeans private ( } /** - * Initialize set of cluster centers at random. + * Initialize a set of cluster centers at random. */ private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = { - val sample = data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()) - sample.map(v => new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)) + data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()).map(_.toDense) } /** - * Initialize set of cluster centers using the k-means|| algorithm by Bahmani et al. + * Initialize a set of cluster centers using the k-means|| algorithm by Bahmani et al. * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing + * to find dissimilar cluster centers by starting with a random center and then doing * passes where more centers are chosen with probability proportional to their squared distance * to the current cluster set. It results in a provable approximation to an optimal clustering. * @@ -341,7 +340,7 @@ class KMeans private ( // Initialize empty centers and point costs. var costs = data.map(_ => Double.PositiveInfinity) - // Initialize each run's first center to a random point. + // Initialize the first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() val sample = data.takeSample(false, 1, seed) // Could be empty if data is empty; fail with a better message early: @@ -381,19 +380,19 @@ class KMeans private ( bcNewCentersList.foreach(_.destroy(false)) if (centers.size <= k) { - return centers.toArray - } - - // Finally, we might have a set of more than k candidate centers; weight each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them - val bcCenters = data.context.broadcast(centers) - val countMap = data.map(p => KMeans.findClosest(bcCenters.value, p)._1).countByValue() + centers.toArray + } else { + // Finally, we might have a set of more than k candidate centers; weight each + // candidate by the number of points in the dataset mapping to it and run a local k-means++ + // on the weighted centers to pick just k of them + val bcCenters = data.context.broadcast(centers) + val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue() - bcCenters.destroy(blocking = false) + bcCenters.destroy(blocking = false) - val myWeights = centers.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray - LocalKMeans.kMeansPlusPlus(0, centers.toArray, myWeights, k, 30) + val myWeights = centers.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray + LocalKMeans.kMeansPlusPlus(0, centers.toArray, myWeights, k, 30) + } } } From 68e3d90a790060f496cdbfeea8171e5f531633e8 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sat, 8 Oct 2016 14:24:09 +0100 Subject: [PATCH 3/6] Back out change to avoid duplicate centroids --- .../scala/org/apache/spark/mllib/clustering/KMeans.scala | 8 ++++---- .../org/apache/spark/mllib/clustering/KMeansSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0f6f241d78ed1..12be9e351342c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -324,7 +324,7 @@ class KMeans private ( * Initialize a set of cluster centers at random. */ private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = { - data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()).map(_.toDense) + data.takeSample(true, k, new XORShiftRandom(this.seed).nextInt()).map(_.toDense) } /** @@ -379,12 +379,12 @@ class KMeans private ( costs.unpersist(blocking = false) bcNewCentersList.foreach(_.destroy(false)) - if (centers.size <= k) { + if (centers.size == k) { centers.toArray } else { - // Finally, we might have a set of more than k candidate centers; weight each + // Finally, we might have a set of more or less than k candidate centers; weight each // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them + // on the weighted centers to pick k of them val bcCenters = data.context.broadcast(centers) val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index a207733b47ffe..2d35b312083c0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -75,7 +75,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 2, maxIterations = 1) - assert(model.clusterCenters.size === 1) + assert(model.clusterCenters.size === 2) } test("more clusters than points") { @@ -87,7 +87,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { // Make sure code runs. var model = KMeans.train(data, k = 3, maxIterations = 1) - assert(model.clusterCenters.size === 2) + assert(model.clusterCenters.size === 3) } test("deterministic initialization") { From 5cb9e5f5438c94e3f44545ab65f8ccd7a85f55af Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 10 Oct 2016 19:00:28 +0100 Subject: [PATCH 4/6] Fix comment; deprecate remaining train() methods with 'run' parameter --- .../spark/mllib/clustering/KMeans.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 12be9e351342c..c1f7072db41ab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -56,8 +56,7 @@ class KMeans private ( def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong()) /** - * Number of clusters to create (k). Note that if the input has fewer than k elements, - * then it's possible that fewer than k clusters are created. + * Number of clusters to create (k). */ @Since("1.4.0") def getK: Int = k @@ -409,6 +408,30 @@ object KMeans { @Since("0.8.0") val K_MEANS_PARALLEL = "k-means||" + /** + * Trains a k-means model using the given set of parameters. + * + * @param data Training points as an `RDD` of `Vector` types. + * @param k Number of clusters to create. + * @param maxIterations Maximum number of iterations allowed. + * @param initializationMode The initialization algorithm. This can either be "random" or + * "k-means||". (default: "k-means||") + * @param seed Random seed for cluster initialization. Default is to generate seed based + * on system time. + */ + @Since("2.1.0") + def train(data: RDD[Vector], + k: Int, + maxIterations: Int, + initializationMode: String, + seed: Long): KMeansModel = { + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .setInitializationMode(initializationMode) + .setSeed(seed) + .run(data) + } + /** * Trains a k-means model using the given set of parameters. * @@ -422,6 +445,7 @@ object KMeans { * on system time. */ @Since("1.3.0") + @deprecated("Use train method without 'runs'", "2.1.0") def train( data: RDD[Vector], k: Int, @@ -447,6 +471,7 @@ object KMeans { * "k-means||". (default: "k-means||") */ @Since("0.8.0") + @deprecated("Use train method without 'runs'", "2.1.0") def train( data: RDD[Vector], k: Int, @@ -480,7 +505,9 @@ object KMeans { k: Int, maxIterations: Int, runs: Int): KMeansModel = { - train(data, k, maxIterations, runs, K_MEANS_PARALLEL) + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .run(data) } /** From 84fb22f9817a2d7660eaae44442a6e88fd8471c1 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 10 Oct 2016 23:02:21 +0200 Subject: [PATCH 5/6] Fix scaladoc style --- .../spark/mllib/clustering/KMeans.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index c1f7072db41ab..6df9f501f50e0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -409,16 +409,16 @@ object KMeans { val K_MEANS_PARALLEL = "k-means||" /** - * Trains a k-means model using the given set of parameters. - * - * @param data Training points as an `RDD` of `Vector` types. - * @param k Number of clusters to create. - * @param maxIterations Maximum number of iterations allowed. - * @param initializationMode The initialization algorithm. This can either be "random" or - * "k-means||". (default: "k-means||") - * @param seed Random seed for cluster initialization. Default is to generate seed based - * on system time. - */ + * Trains a k-means model using the given set of parameters. + * + * @param data Training points as an `RDD` of `Vector` types. + * @param k Number of clusters to create. + * @param maxIterations Maximum number of iterations allowed. + * @param initializationMode The initialization algorithm. This can either be "random" or + * "k-means||". (default: "k-means||") + * @param seed Random seed for cluster initialization. Default is to generate seed based + * on system time. + */ @Since("2.1.0") def train(data: RDD[Vector], k: Int, From ba52582a1313be9d9febe215fe6f21b2d9be239f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 11 Oct 2016 09:40:42 +0200 Subject: [PATCH 6/6] Add another overload, fix train args style, remove internal call to deprecated method --- .../spark/mllib/clustering/KMeans.scala | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 6df9f501f50e0..68a7b3b6763af 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -420,11 +420,12 @@ object KMeans { * on system time. */ @Since("2.1.0") - def train(data: RDD[Vector], - k: Int, - maxIterations: Int, - initializationMode: String, - seed: Long): KMeansModel = { + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + initializationMode: String, + seed: Long): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setInitializationMode(initializationMode) @@ -432,6 +433,27 @@ object KMeans { .run(data) } + /** + * Trains a k-means model using the given set of parameters. + * + * @param data Training points as an `RDD` of `Vector` types. + * @param k Number of clusters to create. + * @param maxIterations Maximum number of iterations allowed. + * @param initializationMode The initialization algorithm. This can either be "random" or + * "k-means||". (default: "k-means||") + */ + @Since("2.1.0") + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + initializationMode: String): KMeansModel = { + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .setInitializationMode(initializationMode) + .run(data) + } + /** * Trains a k-means model using the given set of parameters. * @@ -492,7 +514,9 @@ object KMeans { data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel = { - train(data, k, maxIterations, 1, K_MEANS_PARALLEL) + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .run(data) } /**