diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 79760d69489c..6c7112b80569 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -28,10 +28,12 @@ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.sql.{DataFrame, Dataset} -import org.apache.spark.sql.functions.udf -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel @@ -39,7 +41,8 @@ import org.apache.spark.storage.StorageLevel * Common params for BisectingKMeans and BisectingKMeansModel */ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter - with HasFeaturesCol with HasSeed with HasPredictionCol with HasDistanceMeasure { + with HasFeaturesCol with HasSeed with HasPredictionCol with HasDistanceMeasure + with HasWeightCol { /** * The desired number of leaf clusters. Must be > 1. Default: 4. @@ -261,20 +264,39 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.4.0") def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value) + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * Default is not set, so all instances have weight one. + * + * @group setParam + */ + @Since("3.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) + val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { + col($(weightCol)).cast(DoubleType) + } else { + lit(1.0) + } + + val instances: RDD[(OldVector, Double)] = dataset + .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { + case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + } if (handlePersistence) { - rdd.persist(StorageLevel.MEMORY_AND_DISK) + instances.persist(StorageLevel.MEMORY_AND_DISK) } instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed, - minDivisibleClusterSize, distanceMeasure) + minDivisibleClusterSize, distanceMeasure, weightCol) val bkm = new MLlibBisectingKMeans() .setK($(k)) @@ -282,10 +304,10 @@ class BisectingKMeans @Since("2.0.0") ( .setMinDivisibleClusterSize($(minDivisibleClusterSize)) .setSeed($(seed)) .setDistanceMeasure($(distanceMeasure)) - val parentModel = bkm.run(rdd, Some(instr)) + val parentModel = bkm.runWithWeight(instances, Some(instr)) val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this)) if (handlePersistence) { - rdd.unpersist() + instances.unpersist() } val summary = new BisectingKMeansSummary( diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index b4a31d72f94b..7c12697be95c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -27,6 +27,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.ml.util.Instrumentation import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.BLAS.axpy import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -152,24 +153,34 @@ class BisectingKMeans private ( this } - private[spark] def run( input: RDD[Vector], instr: Option[Instrumentation]): BisectingKMeansModel = { - if (input.getStorageLevel == StorageLevel.NONE) { - logWarning(s"The input RDD ${input.id} is not directly cached, which may hurt performance if" - + " its parent RDDs are also not cached.") + val instances: RDD[(Vector, Double)] = input.map { + case (point) => (point, 1.0) } - val d = input.map(_.size).first() + runWithWeight(instances, None) + } + + private[spark] def runWithWeight( + input: RDD[(Vector, Double)], + instr: Option[Instrumentation]): BisectingKMeansModel = { + val d = input.map(_._1.size).first logInfo(s"Feature dimension: $d.") val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure) // Compute and cache vector norms for fast distance computation. - val norms = input.map(v => Vectors.norm(v, 2.0)).persist(StorageLevel.MEMORY_AND_DISK) - val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) } + val norms = input.map(d => Vectors.norm(d._1, 2.0)) + val vectors = input.zip(norms).map { + case ((x, weight), norm) => new VectorWithNorm(x, norm, weight) + } + if (input.getStorageLevel == StorageLevel.NONE) { + vectors.persist(StorageLevel.MEMORY_AND_DISK) + } var assignments = vectors.map(v => (ROOT_INDEX, v)) var activeClusters = summarize(d, assignments, dMeasure) instr.foreach(_.logNumExamples(activeClusters.values.map(_.size).sum)) + instr.foreach(_.logSumOfWeights(activeClusters.values.map(_.weightSum).sum)) val rootSummary = activeClusters(ROOT_INDEX) val n = rootSummary.size logInfo(s"Number of points: $n.") @@ -239,7 +250,7 @@ class BisectingKMeans private ( if (indices != null) { indices.unpersist() } - norms.unpersist() + vectors.unpersist() val clusters = activeClusters ++ inactiveClusters val root = buildTree(clusters, dMeasure) val totalCost = root.leafNodes.map(_.cost).sum @@ -312,14 +323,16 @@ private object BisectingKMeans extends Serializable { private class ClusterSummaryAggregator(val d: Int, val distanceMeasure: DistanceMeasure) extends Serializable { private var n: Long = 0L + private var weightSum: Double = 0.0 private val sum: Vector = Vectors.zeros(d) private var sumSq: Double = 0.0 /** Adds a point. */ def add(v: VectorWithNorm): this.type = { n += 1L + weightSum += v.weight // TODO: use a numerically stable approach to estimate cost - sumSq += v.norm * v.norm + sumSq += v.norm * v.norm * v.weight distanceMeasure.updateClusterSum(v, sum) this } @@ -327,16 +340,18 @@ private object BisectingKMeans extends Serializable { /** Merges another aggregator. */ def merge(other: ClusterSummaryAggregator): this.type = { n += other.n + weightSum += other.weightSum sumSq += other.sumSq - distanceMeasure.updateClusterSum(new VectorWithNorm(other.sum), sum) + axpy(1.0, other.sum, sum) this } /** Returns the summary. */ def summary: ClusterSummary = { - val center = distanceMeasure.centroid(sum.copy, n) - val cost = distanceMeasure.clusterCost(center, new VectorWithNorm(sum), n, sumSq) - ClusterSummary(n, center, cost) + val center = distanceMeasure.centroid(sum.copy, weightSum) + val cost = distanceMeasure.clusterCost(center, new VectorWithNorm(sum), weightSum, + sumSq) + ClusterSummary(n, weightSum, center, cost) } } @@ -437,10 +452,15 @@ private object BisectingKMeans extends Serializable { * Summary of a cluster. * * @param size the number of points within this cluster + * @param weightSum the weightSum within this cluster * @param center the center of the points within this cluster * @param cost the sum of squared distances to the center */ - private case class ClusterSummary(size: Long, center: VectorWithNorm, cost: Double) + private case class ClusterSummary( + size: Long, + weightSum: Double, + center: VectorWithNorm, + cost: Double) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala index 4d89d1fb9aa9..e83dd3723be1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/DistanceMeasure.scala @@ -76,7 +76,7 @@ private[spark] abstract class DistanceMeasure extends Serializable { def clusterCost( centroid: VectorWithNorm, pointsSum: VectorWithNorm, - numberOfPoints: Long, + weightSum: Double, pointsSquaredNorm: Double): Double /** @@ -84,20 +84,8 @@ private[spark] abstract class DistanceMeasure extends Serializable { * @param point a `VectorWithNorm` to be added to `sum` of a cluster * @param sum the `sum` for a cluster to be updated */ - def updateClusterSum(point: VectorWithNorm, sum: Vector, weight: Double = 1.0): Unit = { - axpy(weight, point.vector, sum) - } - - /** - * Returns a centroid for a cluster given its `sum` vector and its `count` of points. - * - * @param sum the `sum` for a cluster - * @param count the number of points in the cluster - * @return the centroid of the cluster - */ - def centroid(sum: Vector, count: Long): VectorWithNorm = { - scal(1.0 / count, sum) - new VectorWithNorm(sum) + def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { + axpy(point.weight, point.vector, sum) } /** @@ -217,9 +205,9 @@ private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { override def clusterCost( centroid: VectorWithNorm, pointsSum: VectorWithNorm, - numberOfPoints: Long, + weightSum: Double, pointsSquaredNorm: Double): Double = { - math.max(pointsSquaredNorm - numberOfPoints * centroid.norm * centroid.norm, 0.0) + math.max(pointsSquaredNorm - weightSum * centroid.norm * centroid.norm, 0.0) } /** @@ -261,20 +249,20 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { * @param point a `VectorWithNorm` to be added to `sum` of a cluster * @param sum the `sum` for a cluster to be updated */ - override def updateClusterSum(point: VectorWithNorm, sum: Vector, weight: Double = 1.0): Unit = { + override def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { assert(point.norm > 0, "Cosine distance is not defined for zero-length vectors.") - axpy(weight / point.norm, point.vector, sum) + axpy(point.weight / point.norm, point.vector, sum) } /** * Returns a centroid for a cluster given its `sum` vector and its `count` of points. * * @param sum the `sum` for a cluster - * @param count the number of points in the cluster + * @param weightSum the sum of weight in the cluster * @return the centroid of the cluster */ - override def centroid(sum: Vector, count: Long): VectorWithNorm = { - scal(1.0 / count, sum) + override def centroid(sum: Vector, weightSum: Double): VectorWithNorm = { + scal(1.0 / weightSum, sum) val norm = Vectors.norm(sum, 2) scal(1.0 / norm, sum) new VectorWithNorm(sum, 1) @@ -286,10 +274,10 @@ private[spark] class CosineDistanceMeasure extends DistanceMeasure { override def clusterCost( centroid: VectorWithNorm, pointsSum: VectorWithNorm, - numberOfPoints: Long, + weightSum: Double, pointsSquaredNorm: Double): Double = { val costVector = pointsSum.vector.copy - math.max(numberOfPoints - dot(centroid.vector, costVector) / centroid.norm, 0.0) + math.max(weightSum - dot(centroid.vector, costVector) / centroid.norm, 0.0) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index b24dc2398559..a3cf7f96478a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -225,7 +225,7 @@ class KMeans private ( } val zippedData = data.zip(norms).map { case ((v, w), norm) => - (new VectorWithNorm(v, norm), w) + new VectorWithNorm(v, norm, w) } if (data.getStorageLevel == StorageLevel.NONE) { @@ -241,7 +241,7 @@ class KMeans private ( * Implementation of K-Means algorithm. */ private def runAlgorithmWithWeight( - data: RDD[(VectorWithNorm, Double)], + data: RDD[VectorWithNorm], instr: Option[Instrumentation]): KMeansModel = { val sc = data.sparkContext @@ -250,16 +250,14 @@ class KMeans private ( val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure) - val dataVectorWithNorm = data.map(d => d._1) - val centers = initialModel match { case Some(kMeansCenters) => kMeansCenters.clusterCenters.map(new VectorWithNorm(_)) case None => if (initializationMode == KMeans.RANDOM) { - initRandom(dataVectorWithNorm) + initRandom(data) } else { - initKMeansParallel(dataVectorWithNorm, distanceMeasureInstance) + initKMeansParallel(data, distanceMeasureInstance) } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 @@ -279,7 +277,7 @@ class KMeans private ( val bcCenters = sc.broadcast(centers) // Find the new centers - val collected = data.mapPartitions { pointsAndWeights => + val collected = data.mapPartitions { points => val thisCenters = bcCenters.value val dims = thisCenters.head.vector.size @@ -290,11 +288,11 @@ class KMeans private ( // sample1 * weight1/clusterWeightSum + sample2 * weight2/clusterWeightSum + ... val clusterWeightSum = Array.ofDim[Double](thisCenters.length) - pointsAndWeights.foreach { case (point, weight) => + points.foreach { point => val (bestCenter, cost) = distanceMeasureInstance.findClosest(thisCenters, point) - costAccum.add(cost * weight) - distanceMeasureInstance.updateClusterSum(point, sums(bestCenter), weight) - clusterWeightSum(bestCenter) += weight + costAccum.add(cost * point.weight) + distanceMeasureInstance.updateClusterSum(point, sums(bestCenter)) + clusterWeightSum(bestCenter) += point.weight } clusterWeightSum.indices.filter(clusterWeightSum(_) > 0) @@ -511,13 +509,15 @@ object KMeans { /** * A vector with its norm for fast distance computation. */ -private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) - extends Serializable { +private[clustering] class VectorWithNorm( + val vector: Vector, + val norm: Double, + val weight: Double = 1.0) extends Serializable { def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) def this(array: Array[Double]) = this(Vectors.dense(array)) /** Converts the vector to a dense vector. */ - def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) + def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm, weight) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 9984451b08ce..fc756d4c710d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -189,6 +189,134 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest { model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) } + test("Comparing with and without weightCol with cosine distance") { + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Vectors.dense(1.0, 1.0), + Vectors.dense(10.0, 10.0), + Vectors.dense(1.0, 0.5), + Vectors.dense(10.0, 4.4), + Vectors.dense(-1.0, 1.0), + Vectors.dense(-100.0, 90.0) + )).map(v => TestRow(v))) + + val model1 = new BisectingKMeans() + .setK(3) + .setDistanceMeasure(DistanceMeasure.COSINE) + .setSeed(1) + .fit(df1) + val predictionDf1 = model1.transform(df1) + checkNominalOnDF(predictionDf1, "prediction", model1.getK) + + assert(predictionDf1.select("prediction").distinct().count() == 3) + val predictionsMap1 = predictionDf1.collect().map(row => + row.getAs[Vector]("features") -> row.getAs[Int]("prediction")).toMap + assert(predictionsMap1(Vectors.dense(1.0, 1.0)) == + predictionsMap1(Vectors.dense(10.0, 10.0))) + assert(predictionsMap1(Vectors.dense(1.0, 0.5)) == + predictionsMap1(Vectors.dense(10.0, 4.4))) + assert(predictionsMap1(Vectors.dense(-1.0, 1.0)) == + predictionsMap1(Vectors.dense(-100.0, 90.0))) + + model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) + + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + (Vectors.dense(1.0, 1.0), 2.0), (Vectors.dense(10.0, 10.0), 2.0), + (Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 2.0), + (Vectors.dense(-1.0, 1.0), 2.0), (Vectors.dense(-100.0, 90.0), 2.0)))) + .toDF("features", "weightCol") + + val model2 = new BisectingKMeans() + .setK(3) + .setDistanceMeasure(DistanceMeasure.COSINE) + .setSeed(1) + .setWeightCol("weightCol") + .fit(df2) + val predictionDf2 = model2.transform(df2) + checkNominalOnDF(predictionDf2, "prediction", model2.getK) + + assert(predictionDf2.select("prediction").distinct().count() == 3) + val predictionsMap2 = predictionDf2.collect().map(row => + row.getAs[Vector]("features") -> row.getAs[Int]("prediction")).toMap + assert(predictionsMap2(Vectors.dense(1.0, 1.0)) == + predictionsMap2(Vectors.dense(10.0, 10.0))) + assert(predictionsMap2(Vectors.dense(1.0, 0.5)) == + predictionsMap2(Vectors.dense(10.0, 4.4))) + assert(predictionsMap2(Vectors.dense(-1.0, 1.0)) == + predictionsMap2(Vectors.dense(-100.0, 90.0))) + + model2.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) + assert(model1.clusterCenters === model2.clusterCenters) + } + + test("Comparing with and without weightCol") { + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Vectors.dense(1.0, 1.0), + Vectors.dense(10.0, 10.0), + Vectors.dense(10.0, 10.0), + Vectors.dense(1.0, 0.5), + Vectors.dense(1.0, 0.5), + Vectors.dense(10.0, 4.4), + Vectors.dense(10.0, 4.4), + Vectors.dense(10.0, 4.4), + Vectors.dense(-1.0, 1.0), + Vectors.dense(-1.0, 1.0), + Vectors.dense(-1.0, 1.0), + Vectors.dense(-100.0, 90.0), + Vectors.dense(-100.0, 90.0), + Vectors.dense(-100.0, 90.0), + Vectors.dense(-100.0, 90.0) + )).map(v => TestRow(v))) + + val model1 = new BisectingKMeans() + .setK(3) + .setSeed(1) + .fit(df1) + val predictionDf1 = model1.transform(df1) + checkNominalOnDF(predictionDf1, "prediction", model1.getK) + + assert(predictionDf1.select("prediction").distinct().count() == 3) + val predictionsMap1 = predictionDf1.collect().map(row => + row.getAs[Vector]("features") -> row.getAs[Int]("prediction")).toMap + assert(predictionsMap1(Vectors.dense(1.0, 1.0)) == + predictionsMap1(Vectors.dense(1.0, 0.5))) + assert(predictionsMap1(Vectors.dense(1.0, 1.0)) == + predictionsMap1(Vectors.dense(-1.0, 1.0))) + assert(predictionsMap1(Vectors.dense(10.0, 10.0)) == + predictionsMap1(Vectors.dense(10.0, 4.4))) + + model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) + + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array( + (Vectors.dense(1.0, 1.0), 1.0), (Vectors.dense(10.0, 10.0), 2.0), + (Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 3.0), + (Vectors.dense(-1.0, 1.0), 3.0), (Vectors.dense(-100.0, 90.0), 4.0)))) + .toDF("features", "weightCol") + + val model2 = new BisectingKMeans() + .setK(3) + .setSeed(1) + .setWeightCol("weightCol") + .fit(df2) + val predictionDf2 = model2.transform(df2) + checkNominalOnDF(predictionDf2, "prediction", model2.getK) + + assert(predictionDf2.select("prediction").distinct().count() == 3) + val predictionsMap2 = predictionDf2.collect().map(row => + row.getAs[Vector]("features") -> row.getAs[Int]("prediction")).toMap + assert(predictionsMap2(Vectors.dense(1.0, 1.0)) == + predictionsMap2(Vectors.dense(1.0, 0.5))) + assert(predictionsMap2(Vectors.dense(1.0, 1.0)) == + predictionsMap2(Vectors.dense(-1.0, 1.0))) + assert(predictionsMap2(Vectors.dense(10.0, 10.0)) == + predictionsMap2(Vectors.dense(10.0, 4.4))) + + model2.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) + + assert(model1.clusterCenters(0) === model2.clusterCenters(0)) + assert(model1.clusterCenters(1) === model2.clusterCenters(1)) + assert(model1.clusterCenters(2) ~== model2.clusterCenters(2) absTol 1e-6) + } + test("BisectingKMeans with Array input") { def trainAndComputeCost(dataset: DataFrame): Double = { val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index d9b6b886dc51..e236ff0423c9 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -733,7 +733,7 @@ def setWeightCol(self, value): @inherit_doc class _BisectingKMeansParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol, - HasDistanceMeasure): + HasDistanceMeasure, HasWeightCol): """ Params for :py:class:`BisectingKMeans` and :py:class:`BisectingKMeansModel`. @@ -838,9 +838,9 @@ class BisectingKMeans(JavaEstimator, _BisectingKMeansParams, JavaMLWritable, Jav clusters, larger clusters get higher priority. >>> from pyspark.ml.linalg import Vectors - >>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), - ... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] - >>> df = spark.createDataFrame(data, ["features"]) + >>> data = [(Vectors.dense([0.0, 0.0]), 2.0), (Vectors.dense([1.0, 1.0]), 2.0), + ... (Vectors.dense([9.0, 8.0]), 2.0), (Vectors.dense([8.0, 9.0]), 2.0)] + >>> df = spark.createDataFrame(data, ["features", "weighCol"]) >>> bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0) >>> bkm.setMaxIter(10) BisectingKMeans... @@ -849,6 +849,8 @@ class BisectingKMeans(JavaEstimator, _BisectingKMeansParams, JavaMLWritable, Jav >>> bkm.clear(bkm.maxIter) >>> bkm.setSeed(1) BisectingKMeans... + >>> bkm.setWeightCol("weighCol") + BisectingKMeans... >>> bkm.getSeed() 1 >>> bkm.clear(bkm.seed) @@ -872,7 +874,7 @@ class BisectingKMeans(JavaEstimator, _BisectingKMeansParams, JavaMLWritable, Jav >>> summary.clusterSizes [2, 2] >>> summary.trainingCost - 2.000... + 4.000... >>> transformed = model.transform(df).select("features", "newPrediction") >>> rows = transformed.collect() >>> rows[0].newPrediction == rows[1].newPrediction @@ -901,10 +903,12 @@ class BisectingKMeans(JavaEstimator, _BisectingKMeansParams, JavaMLWritable, Jav @keyword_only def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, - seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean"): + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean", + weightCol=None): """ __init__(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean") + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean", \ + weightCol=None) """ super(BisectingKMeans, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.BisectingKMeans", @@ -916,10 +920,12 @@ def __init__(self, featuresCol="features", predictionCol="prediction", maxIter=2 @keyword_only @since("2.0.0") def setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, - seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean"): + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean", + weightCol=None): """ setParams(self, featuresCol="features", predictionCol="prediction", maxIter=20, \ - seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean") + seed=None, k=4, minDivisibleClusterSize=1.0, distanceMeasure="euclidean", \ + weightCol=None) Sets params for BisectingKMeans. """ kwargs = self._input_kwargs @@ -974,6 +980,13 @@ def setSeed(self, value): """ return self._set(seed=value) + @since("3.0.0") + def setWeightCol(self, value): + """ + Sets the value of :py:attr:`weightCol`. + """ + return self._set(weightCol=value) + def _create_model(self, java_model): return BisectingKMeansModel(java_model)