Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ class KMeans private (
data: RDD[(Vector, Double)],
instr: Option[Instrumentation]): KMeansModel = {

if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}

// Compute squared norms and cache them.
val norms = data.map { case (v, _) =>
Vectors.norm(v, 2.0)
Expand All @@ -232,15 +227,13 @@ class KMeans private (
val zippedData = data.zip(norms).map { case ((v, w), norm) =>
(new VectorWithNorm(v, norm), w)
}
zippedData.persist(StorageLevel.MEMORY_AND_DISK)
val model = runAlgorithmWithWeight(zippedData, instr)
zippedData.unpersist()

// Warn at the end of the run as well, for increased visibility.
if (data.getStorageLevel == StorageLevel.NONE) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I was testing spark kmeans. There should be an issue that no matter we persist the parent RDD, here the data.getStorageLevel will always be NONE due to the following operation, this will cause double caching.

def run(data: RDD[Vector]): KMeansModel = {
    val instances: RDD[(Vector, Double)] = data.map {
      case (point) => (point, 1.0)
    }
    runWithWeight(instances, None)
}

logWarning("The input data was not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
zippedData.persist(StorageLevel.MEMORY_AND_DISK)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove the two warnings in this method? it's not a big deal now if the source data is uncached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about caching norms if data is already cached? like this:

val handlePersistence = data.getStorageLevel == StorageLevel.NONE
val norms = ...
val zippedData = if (handlePersistence) {
   data.zip(norms).map { case ((v, w), norm) =>
      (new VectorWithNorm(v, norm), w)
    }.persist(StorageLevel.MEMORY_AND_DISK)
} else {
     norms.persist(StorageLevel.MEMORY_AND_DISK)
     data.zip(norms).map { case ((v, w), norm) =>
        (new VectorWithNorm(v, norm), w)
     }
}

...


if (handlePersistence) {
   zippedData.unpersist()
} else {
   norms.unpersist()
}

Copy link
Contributor Author

@amanomer amanomer Dec 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about caching norms if data is already cached?

Won't this lead to double caching problem which we are trying to avoid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought that was your point. If zippedData were expensive, I'd agree that caching the intermediate values too is worthwhile, and we do that in some places. Here it's not, and the original behavior was to always cache internally, so I guess this is less of a change. This at least skips it where it can be inexpensively computed

val model = runAlgorithmWithWeight(zippedData, instr)
zippedData.unpersist()

model
}

Expand Down