Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] {
("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))

// We want to partition the model in partitions of size 32MB
val partitionSize = (1L << 25)
// We want to partition the model in partitions smaller than
// spark.kryoserializer.buffer.max
val bufferSize = Utils.byteStringAsBytes(
spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
// We calculate the approximate size of the model
// We only calculate the array size, not considering
// the string size, the formula is:
// floatSize * numWords * vectorSize
val approxSize = 4L * numWords * vectorSize
val nPartitions = ((approxSize / partitionSize) + 1).toInt
// We only calculate the array size, considering an
// average string size of 15 bytes, the formula is:
// (floatSize * vectorSize + 15) * numWords
val approxSize = (4L * vectorSize + 15) * numWords
val nPartitions = ((approxSize / bufferSize) + 1).toInt
val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,40 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {

}

ignore("big model load / save") {
// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25
val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*)
test("big model load / save") {
// backupping old values
val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")

// setting test values to trigger partitioning
spark.conf.set("spark.kryoserializer.buffer", "50b")
spark.conf.set("spark.kryoserializer.buffer.max", "50b")

// create a model bigger than 50 Bytes
val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
val model = new Word2VecModel(word2VecMap)

// est. size of this model, given the formula:
// (floatSize * vectorSize + 15) * numWords
// (4 * 10 + 15) * 10 = 550
// therefore it should generate multiple partitions
val tempDir = Utils.createTempDir()
val path = tempDir.toURI.toString

try {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq))
}
catch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: put on previous line

case t: Throwable => fail("exception thrown persisting a model " +
"that spans over multiple partitions", t)
} finally {
Utils.deleteRecursively(tempDir)
spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
}

}

test("test similarity for word vectors with large values is not Infinity or NaN") {
Expand Down