Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ class BisectingKMeans @Since("2.0.0") (
val parentModel = bkm.run(rdd, Some(instr))
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
val summary = new BisectingKMeansSummary(
model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter))
model.transform(dataset),
$(predictionCol),
$(featuresCol),
$(k),
$(maxIter),
parentModel.trainingCost)
instr.logNamedValue("clusterSizes", summary.clusterSizes)
instr.logNumFeatures(model.clusterCenters.head.size)
model.setSummary(Some(summary))
Expand Down Expand Up @@ -294,6 +299,8 @@ object BisectingKMeans extends DefaultParamsReadable[BisectingKMeans] {
* @param featuresCol Name for column of features in `predictions`.
* @param k Number of clusters.
* @param numIter Number of iterations.
* @param trainingCost Sum of the cost to the nearest centroid for all points in the training
* dataset. This is equivalent to sklearn's inertia.
*/
@Since("2.1.0")
@Experimental
Expand All @@ -302,4 +309,6 @@ class BisectingKMeansSummary private[clustering] (
predictionCol: String,
featuresCol: String,
k: Int,
numIter: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter)
numIter: Int,
@Since("3.0.0") val trainingCost: Double)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a big deal for 3.0, but we lose the constructor without the new param. That's probably OK as the summary kind of needs this value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor is private so I don't think it is a problem to avoid having the previous one.

extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter)
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class BisectingKMeans private (
norms.unpersist(false)
val clusters = activeClusters ++ inactiveClusters
val root = buildTree(clusters, dMeasure)
new BisectingKMeansModel(root, this.distanceMeasure)
val totalCost = root.leafNodes.map(_.cost).sum
new BisectingKMeansModel(root, this.distanceMeasure, totalCost)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import org.apache.spark.sql.{Row, SparkSession}
@Since("1.6.0")
class BisectingKMeansModel private[clustering] (
private[clustering] val root: ClusteringTreeNode,
@Since("2.4.0") val distanceMeasure: String
@Since("2.4.0") val distanceMeasure: String,
@Since("3.0.0") val trainingCost: Double
) extends Serializable with Saveable with Logging {

@Since("1.6.0")
def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN)
def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN, 0.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, we did preserve this old constructor, and that's fine to keep. The other issue I see here is that the cost is 0, when the cost is really unknown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because this is public, so users may rely on it. The idea is that this is indeed a "new feature" (previously is was not accessible) and we are not guaranteeing new features in the MLLib API. I just followed the same approach which was used for KMeans.


private val distanceMeasureInstance: DistanceMeasure =
DistanceMeasure.decodeFromString(distanceMeasure)
Expand Down Expand Up @@ -109,10 +110,10 @@ class BisectingKMeansModel private[clustering] (

@Since("2.0.0")
override def save(sc: SparkContext, path: String): Unit = {
BisectingKMeansModel.SaveLoadV2_0.save(sc, this, path)
BisectingKMeansModel.SaveLoadV3_0.save(sc, this, path)
}

override protected def formatVersion: String = "2.0"
override protected def formatVersion: String = "3.0"
}

@Since("2.0.0")
Expand All @@ -128,11 +129,15 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
case (SaveLoadV2_0.thisClassName, SaveLoadV2_0.thisFormatVersion) =>
val model = SaveLoadV2_0.load(sc, path)
model
case (SaveLoadV3_0.thisClassName, SaveLoadV3_0.thisFormatVersion) =>
val model = SaveLoadV3_0.load(sc, path)
model
case _ => throw new Exception(
s"BisectingKMeansModel.load did not recognize model with (className, format version):" +
s"($loadedClassName, $formatVersion). Supported:\n" +
s" (${SaveLoadV1_0.thisClassName}, ${SaveLoadV1_0.thisClassName}\n" +
s" (${SaveLoadV2_0.thisClassName}, ${SaveLoadV2_0.thisClassName})")
s" (${SaveLoadV2_0.thisClassName}, ${SaveLoadV2_0.thisClassName})\n" +
s" (${SaveLoadV3_0.thisClassName}, ${SaveLoadV3_0.thisClassName})")
}
}

Expand Down Expand Up @@ -195,7 +200,8 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val data = rows.select("index", "size", "center", "norm", "cost", "height", "children")
val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap
val rootNode = buildTree(rootId, nodes)
new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN)
val totalCost = rootNode.leafNodes.map(_.cost).sum
new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN, totalCost)
}
}

Expand Down Expand Up @@ -231,7 +237,46 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val data = rows.select("index", "size", "center", "norm", "cost", "height", "children")
val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap
val rootNode = buildTree(rootId, nodes)
new BisectingKMeansModel(rootNode, distanceMeasure)
val totalCost = rootNode.leafNodes.map(_.cost).sum
new BisectingKMeansModel(rootNode, distanceMeasure, totalCost)
}
}

private[clustering] object SaveLoadV3_0 {
private[clustering] val thisFormatVersion = "3.0"

private[clustering]
val thisClassName = "org.apache.spark.mllib.clustering.BisectingKMeansModel"

def save(sc: SparkContext, model: BisectingKMeansModel, path: String): Unit = {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
node.children.map(_.index)))
spark.createDataFrame(data).write.parquet(Loader.dataPath(path))
}

def load(sc: SparkContext, path: String): BisectingKMeansModel = {
implicit val formats: DefaultFormats = DefaultFormats
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
val rootId = (metadata \ "rootId").extract[Int]
val distanceMeasure = (metadata \ "distanceMeasure").extract[String]
val trainingCost = (metadata \ "trainingCost").extract[Double]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, can this read old model from previous version?

Copy link
Contributor

@WeichenXu123 WeichenXu123 Oct 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could you avoid modifying loading model code in "mllib" package, but modifying code in "ml" package, i.e., the class ml.clustering.BisectingKMeansModel.BisectingKMeansModelReader, you can reference the KMeans code: ml.clustering.KMeansModel.KMeansModelReader.
    (Don't let ml.clustering.BisectingKMeansModel.BisectingKMeansModelReader call mllib.clustering.BisectingKMeansModel.load)

  • And, +1 with @viirya mentioned, we should keep model loading compatibility, add a version check (when >= 2.4) then we load "training cost" . Note that add these in ml.clustering.BisectingKMeansModel.BisectingKMeansModelReader.

  • And, could you also add version check (when >= 2.4) then we load "training cost" into ml.clustering.KMeansModel.KMeansModelReader ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do other models have this problem? I was told that this change just follows what we did for other models before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all for the comments and sorry for the late answer. Just a couple of notes on your comments @WeichenXu123 (I may be missing something, so please correct me if I am wrong):

  • I checked the ml.clustering.KMeansModel.KMeansModelReader and it doesn't store anything related to the summary. Summary is not recovered after load of the model, so I don't see any reason why we should modify the read/load of ml.clustering.BisectingKMeansModel.BisectingKMeansModelReader;
  • this model can read from previous versions, since this is version "2.0", which was introduced for Spark 2.4; for previous versions, we read/write version "1.0"; the version check method for versioning is used only for the ml package, not in mllib where we have this versioning approach;

I was told that this change just follows what we did for other models before.

@cloud-fan Yes, let me link the PR for KMeans doing the same, which is: #20629.

Just a final comment which I hope clarifies which is the source of the confusion here and the reason of the above comments by @viirya and @WeichenXu123: trainingCost here is a member of the summary, not a parameter of the model for the ml.clustering.BisectingKMeansModel. Instead, it is a member of the model for mllib.clustering.BisectingKMeansModel (we have no summary notion there...). So storing it for mllib is needed in order for the model read after persisting it being the same of the original one (I think it doesn't pass UTs otherwise). Storing it for the ml, instead, it is not needed, because the summary is not persisted. If we want to persist also the summary for ml package I think we should best create a separate JIRA and PR for it.

Hope this clarifies (sorry for being so verbose).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this model can read from previous versions, since this is version "2.0", which was introduced for Spark 2.4; for previous versions, we read/write version "1.0"; the version check method for versioning is used only for the ml package, not in mllib where we have this versioning approach;

I meant that can it read old model from previous versions, not that this model can read from previous versions.

In other words, when reading a previous model without "trainingCost" in metadata, can this line work well?

val trainingCost = (metadata \ "trainingCost").extract[Double]

Copy link
Contributor

@WeichenXu123 WeichenXu123 Oct 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91
(I haven't test this, so correct me if I am wrong).

I don't see (and think) this change breaks backwards compatibility for mllib.

I am suspicious of this line in load method:

val trainingCost = (metadata \ "trainingCost").extract[Double]

When loading an old version spark(e.g. spark 2.3.1) saved BisectingKMeansModel, because it do not contain "trainingCost" info, I guess this line will throw error. (Otherwise what will it return ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 I have explained it in and #22764 (comment). If you don't agree or believe on what I said you can try it.

A model saved in 2.3.1 will have "1.0" as version. So this code is not run. Every model from 2.4.0 on, will be saved with "2.0" as version, so it will have this stored. As mentioned, please notice that SaveLoadV2_0 was introduced for 2.4.0. Of course, if this commit won't go in 2.4, then I'll have to create a SaveLoadV3_0 in order to support it (or, if we agree that this doesn't need to be restored after model persistence, we can just ignore it).

Hope this clarifies. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am more confusing...

  • I think this line should be a already existed mistake. It is too weird.
    I think it should be case (SaveLoadV2_0.thisClassName, SaveLoadV2_0.thisFormatVersion) => val model = SaveLoadV2_0.load(sc, path)

  • Suppose you're right, then in which place your code call SaveLoadV2_0 ? I don't find it ... ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes @WeichenXu123 , you're right, that line is a bug. Thanks for noticing it. Anyway, that is going to be addressed in another PR and it is not (strictly) related to this one. The other option, as I mentioned, is that if we agree that this doesn't need to be restored after model persistence, we can just ignore it in save/load.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. After #22790 merged, I think this PR can work.

val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val rows = spark.read.parquet(Loader.dataPath(path))
Loader.checkSchema[Data](rows.schema)
val data = rows.select("index", "size", "center", "norm", "cost", "height", "children")
val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap
val rootNode = buildTree(rootId, nodes)
new BisectingKMeansModel(rootNode, distanceMeasure, trainingCost)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
assert(clusterSizes.sum === numRows)
assert(clusterSizes.forall(_ >= 0))
assert(summary.numIter == 20)
assert(summary.trainingCost < 0.1)
assert(model.computeCost(dataset) == summary.trainingCost)

model.setSummary(None)
assert(!model.hasSummary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(model.k === sameModel.k)
assert(model.distanceMeasure === sameModel.distanceMeasure)
model.clusterCenters.zip(sameModel.clusterCenters).foreach(c => c._1 === c._2)
assert(model.trainingCost == sameModel.trainingCost)
} finally {
Utils.deleteRecursively(tempDir)
}
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ object MimaExcludes {

// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
// [SPARK-25765][ML] Add training cost to BisectingKMeans summary
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel.this"),

// [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError"),

Expand Down
12 changes: 11 additions & 1 deletion python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ class BisectingKMeans(JavaEstimator, HasDistanceMeasure, HasFeaturesCol, HasPred
2
>>> summary.clusterSizes
[2, 2]
>>> summary.trainingCost
2.000...
>>> transformed = model.transform(df).select("features", "prediction")
>>> rows = transformed.collect()
>>> rows[0].prediction == rows[1].prediction
Expand Down Expand Up @@ -700,7 +702,15 @@ class BisectingKMeansSummary(ClusteringSummary):

.. versionadded:: 2.1.0
"""
pass

@property
@since("3.0.0")
def trainingCost(self):
"""
Sum of squared distances to the nearest centroid for all points in the training dataset.
This is equivalent to sklearn's inertia.
"""
return self._call_java("trainingCost")


@inherit_doc
Expand Down