Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {

import MultilayerPerceptronClassifier._

Expand All @@ -54,6 +54,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
@Since("1.5.0")
final def getLayers: Array[Int] = $(layers)

/**
* Block size for stacking input data in matrices to speed up the computation.
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
*
* @group expertParam
*/
@Since("1.5.0")
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices. Data is stacked within partitions." +
" If block size is more than remaining data in a partition then " +
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
ParamValidators.gt(0))

/** @group expertGetParam */
@Since("1.5.0")
final def getBlockSize: Int = $(blockSize)

/**
* The solver algorithm for optimization.
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
Expand Down
46 changes: 12 additions & 34 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
with HasBlockSize {
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
Expand Down Expand Up @@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo

/** @group expertGetParam */
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)

setDefault(blockSize -> 4096)
}

/**
Expand Down Expand Up @@ -291,15 +288,6 @@ class ALSModel private[ml] (
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
if (featuresA != null && featuresB != null) {
var dotProduct = 0.0f
Expand Down Expand Up @@ -363,7 +351,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllUsers(numItems: Int): DataFrame = {
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
}

/**
Expand All @@ -378,7 +366,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
}

/**
Expand All @@ -389,7 +377,7 @@ class ALSModel private[ml] (
*/
@Since("2.2.0")
def recommendForAllItems(numUsers: Int): DataFrame = {
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
}

/**
Expand All @@ -404,7 +392,7 @@ class ALSModel private[ml] (
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
}

/**
Expand Down Expand Up @@ -453,12 +441,11 @@ class ALSModel private[ml] (
dstFactors: DataFrame,
srcOutputColumn: String,
dstOutputColumn: String,
num: Int,
blockSize: Int): DataFrame = {
num: Int): DataFrame = {
import srcFactors.sparkSession.implicits._

val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap { case (srcIter, dstIter) =>
Expand Down Expand Up @@ -496,10 +483,11 @@ class ALSModel private[ml] (

/**
* Blockifies factors to improve the efficiency of cross join
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
factors: Dataset[(Int, Array[Float])],
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
import factors.sparkSession.implicits._
factors.mapPartitions(_.grouped(blockSize))
}
Expand Down Expand Up @@ -666,15 +654,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
@Since("2.2.0")
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)

/**
* Set block size for stacking input data in matrices.
* Default is 4096.
*
* @group expertSetParam
*/
@Since("3.0.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)

/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
Expand Down Expand Up @@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
instr.logDataset(dataset)
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel, blockSize)
seed, intermediateStorageLevel, finalStorageLevel)

val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
Expand All @@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
checkpointInterval = $(checkpointInterval), seed = $(seed))
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
.setParent(this)
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
copyValues(model)
}

Expand Down
22 changes: 14 additions & 8 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ def sigma(self):


class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
HasTol, HasStepSize, HasSolver, HasBlockSize):
HasTol, HasStepSize, HasSolver):
"""
Params for :py:class:`MultilayerPerceptronClassifier`.

Expand All @@ -2185,6 +2185,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
"neurons and output layer of 10 neurons.",
typeConverter=TypeConverters.toListInt)
blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
"matrices. Data is stacked within partitions. If block size is more than " +
"remaining data in a partition then it is adjusted to the size of this " +
"data. Recommended size is between 10 and 1000, default is 128.",
typeConverter=TypeConverters.toInt)
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
Expand All @@ -2197,6 +2202,13 @@ def getLayers(self):
"""
return self.getOrDefault(self.layers)

@since("1.6.0")
def getBlockSize(self):
"""
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)

@since("2.0.0")
def getInitialWeights(self):
"""
Expand All @@ -2220,17 +2232,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
... (1.0, Vectors.dense([0.0, 1.0])),
... (1.0, Vectors.dense([1.0, 0.0])),
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
>>> mlp.setMaxIter(100)
MultilayerPerceptronClassifier...
>>> mlp.getMaxIter()
100
>>> mlp.getBlockSize()
128
>>> mlp.setBlockSize(1)
MultilayerPerceptronClassifier...
>>> mlp.getBlockSize()
1
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...
Expand Down
29 changes: 6 additions & 23 deletions python/pyspark/ml/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@inherit_doc
class _ALSModelParams(HasPredictionCol, HasBlockSize):
class _ALSModelParams(HasPredictionCol):
"""
Params for :py:class:`ALS` and :py:class:`ALSModel`.

Expand Down Expand Up @@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
>>> model.getBlockSize()
4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
Expand Down Expand Up @@ -284,22 +282,21 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
"""
super(ALS, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand All @@ -309,13 +306,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
ratingCol="rating", nonnegative=False, checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
intermediateStorageLevel="MEMORY_AND_DISK", \
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
Sets params for ALS.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -446,13 +443,6 @@ def setSeed(self, value):
"""
return self._set(seed=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)


class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
"""
Expand Down Expand Up @@ -489,13 +479,6 @@ def setPredictionCol(self, value):
"""
return self._set(predictionCol=value)

@since("3.0.0")
def setBlockSize(self, value):
"""
Sets the value of :py:attr:`blockSize`.
"""
return self._set(blockSize=value)

@property
@since("1.4.0")
def rank(self):
Expand Down