Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Vector => OldVector}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel

/**
Expand Down Expand Up @@ -212,14 +212,34 @@ class FMClassifier @Since("3.0.0") (

if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)

val coefficients = trainImpl(data, numFeatures, LogisticLoss)
val (coefficients, objectiveHistory) = trainImpl(data, numFeatures, LogisticLoss)

val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))

if (handlePersistence) data.unpersist()

copyValues(new FMClassificationModel(uid, intercept, linear, factors))
createModel(dataset, intercept, linear, factors, objectiveHistory)
}

private def createModel(
dataset: Dataset[_],
intercept: Double,
linear: Vector,
factors: Matrix,
objectiveHistory: Array[Double]): FMClassificationModel = {
val model = copyValues(new FMClassificationModel(uid, intercept, linear, factors))
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val summary = new FMClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
model.setSummary(Some(summary))
}

@Since("3.0.0")
Expand All @@ -243,14 +263,36 @@ class FMClassificationModel private[classification] (
@Since("3.0.0") val linear: Vector,
@Since("3.0.0") val factors: Matrix)
extends ProbabilisticClassificationModel[Vector, FMClassificationModel]
with FMClassifierParams with MLWritable {
with FMClassifierParams with MLWritable
with HasTrainingSummary[FMClassificationTrainingSummary]{

@Since("3.0.0")
override val numClasses: Int = 2

@Since("3.0.0")
override val numFeatures: Int = linear.size

/**
* Gets summary of model on training set. An exception is thrown
* if `hasSummary` is false.
*/
@Since("3.1.0")
override def summary: FMClassificationTrainingSummary = super.summary

/**
* Evaluates the model on a test dataset.
*
* @param dataset Test dataset to evaluate model on.
*/
@Since("3.1.0")
def evaluate(dataset: Dataset[_]): FMClassificationSummary = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)
// Handle possible missing or invalid probability or prediction columns
val (summaryModel, probability, predictionColName) = findSummaryModel()
new FMClassificationSummaryImpl(summaryModel.transform(dataset),
probability, predictionColName, $(labelCol), weightColName)
}

@Since("3.0.0")
override def predictRaw(features: Vector): Vector = {
val rawPrediction = getRawPrediction(features, intercept, linear, factors)
Expand Down Expand Up @@ -328,3 +370,53 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] {
}
}
}

/**
* Abstraction for FMClassifier results for a given model.
*/
sealed trait FMClassificationSummary extends BinaryClassificationSummary

/**
* Abstraction for FMClassifier training results.
*/
sealed trait FMClassificationTrainingSummary extends FMClassificationSummary with TrainingSummary

/**
* FMClassifier results for a given model.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
*/
private class FMClassificationSummaryImpl(
@transient override val predictions: DataFrame,
override val scoreCol: String,
override val predictionCol: String,
override val labelCol: String,
override val weightCol: String)
extends FMClassificationSummary

/**
* FMClassifier training results.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
* @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
*/
private class FMClassificationTrainingSummaryImpl(
predictions: DataFrame,
scoreCol: String,
predictionCol: String,
labelCol: String,
weightCol: String,
override val objectiveHistory: Array[Double])
extends FMClassificationSummaryImpl(
predictions, scoreCol, predictionCol, labelCol, weightCol)
with FMClassificationTrainingSummary
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[ml] trait FactorizationMachinesParams extends PredictorParams
with HasMaxIter with HasStepSize with HasTol with HasSolver with HasSeed
with HasFitIntercept with HasRegParam {
with HasFitIntercept with HasRegParam with HasWeightCol {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add with HasWeightCol because ClassificationSummary uses weigthCol. However, FM doesn't really support instance weight yet and all the weight are default to 1.0.


/**
* Param for dimensionality of the factors (>= 0)
Expand Down Expand Up @@ -134,7 +134,7 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
data: RDD[(Double, OldVector)],
numFeatures: Int,
loss: String
): Vector = {
): (Vector, Array[Double]) = {

// initialize coefficients
val initialCoefficients = initCoefficients(numFeatures)
Expand All @@ -151,8 +151,8 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
.setRegParam($(regParam))
.setMiniBatchFraction($(miniBatchFraction))
.setConvergenceTol($(tol))
val coefficients = optimizer.optimize(data, initialCoefficients)
coefficients.asML
val (coefficients, lossHistory) = optimizer.optimizeWithLossReturned(data, initialCoefficients)
(coefficients.asML, lossHistory)
}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ class FMRegressor @Since("3.0.0") (

if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)

val coefficients = trainImpl(data, numFeatures, SquaredError)
val (coefficients, _) = trainImpl(data, numFeatures, SquaredError)

val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,20 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
* @return solution vector
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}

/**
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector and loss value in an array
*/
def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
Expand All @@ -139,7 +152,6 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
miniBatchFraction,
initialWeights,
convergenceTol)
weights
}

}
Expand Down Expand Up @@ -195,7 +207,7 @@ object GradientDescent extends Logging {
s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction")
}

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val stochasticLossHistory = new ArrayBuffer[Double](numIterations + 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this stochasticLossHistory contain initial state + the state for each iteration, so it is consistent with the objectiveHistory in LogisticRegression and LinearRegression

// Record previous weight and current one to calculate solution vector difference

var previousWeights: Option[Vector] = None
Expand Down Expand Up @@ -226,7 +238,7 @@ object GradientDescent extends Logging {

var converged = false // indicates whether converged based on convergenceTol
var i = 1
while (!converged && i <= numIterations) {
while (!converged && (i <= numIterations + 1)) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
Expand All @@ -249,17 +261,19 @@ object GradientDescent extends Logging {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
if (i != (numIterations + 1)) {
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
Expand All @@ -271,7 +285,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
}

override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}

def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
LBFGS.runLBFGS(
data,
gradient,
updater,
Expand All @@ -145,9 +152,7 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
maxNumIterations,
regParam,
initialWeights)
weights
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,32 @@ class FMClassifierSuite extends MLTest with DefaultReadWriteTest {
testPredictionModelSinglePrediction(fmModel, smallBinaryDataset)
}

test("summary and training summary") {
val fm = new FMClassifier()
val model = fm.setMaxIter(5).fit(smallBinaryDataset)

val summary = model.evaluate(smallBinaryDataset)

assert(model.summary.accuracy === summary.accuracy)
assert(model.summary.weightedPrecision === summary.weightedPrecision)
assert(model.summary.weightedRecall === summary.weightedRecall)
assert(model.summary.pr.collect() === summary.pr.collect())
assert(model.summary.roc.collect() === summary.roc.collect())
assert(model.summary.areaUnderROC === summary.areaUnderROC)
}

test("FMClassifier training summary totalIterations") {
Seq(1, 5, 10, 20, 100).foreach { maxIter =>
val trainer = new FMClassifier().setMaxIter(maxIter)
val model = trainer.fit(smallBinaryDataset)
if (maxIter == 1) {
assert(model.summary.totalIterations === maxIter)
} else {
assert(model.summary.totalIterations <= maxIter)
}
}
}

test("read/write") {
def checkModelData(
model: FMClassificationModel,
Expand Down
48 changes: 46 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
'NaiveBayes', 'NaiveBayesModel',
'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel',
'OneVsRest', 'OneVsRestModel',
'FMClassifier', 'FMClassificationModel']
'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary',
'FMClassificationTrainingSummary']


class _ClassifierParams(HasRawPredictionCol, _PredictorParams):
Expand Down Expand Up @@ -3226,7 +3227,7 @@ def setRegParam(self, value):


class FMClassificationModel(_JavaProbabilisticClassificationModel, _FactorizationMachinesParams,
JavaMLWritable, JavaMLReadable):
JavaMLWritable, JavaMLReadable, HasTrainingSummary):
"""
Model fitted by :class:`FMClassifier`.

Expand Down Expand Up @@ -3257,6 +3258,49 @@ def factors(self):
"""
return self._call_java("factors")

@since("3.1.0")
def summary(self):
"""
Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model
trained on the training set. An exception is thrown if `trainingSummary is None`.
"""
if self.hasSummary:
return FMClassificationTrainingSummary(super(FMClassificationModel, self).summary)
else:
raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)

@since("3.1.0")
def evaluate(self, dataset):
"""
Evaluates the model on a test dataset.

:param dataset:
Test dataset to evaluate model on, where dataset is an
instance of :py:class:`pyspark.sql.DataFrame`
"""
if not isinstance(dataset, DataFrame):
raise ValueError("dataset must be a DataFrame but got %s." % type(dataset))
java_fm_summary = self._call_java("evaluate", dataset)
return FMClassificationSummary(java_fm_summary)


class FMClassificationSummary(_BinaryClassificationSummary):
"""
Abstraction for FMClassifier Results for a given model.
.. versionadded:: 3.1.0
"""
pass


@inherit_doc
class FMClassificationTrainingSummary(FMClassificationSummary, _TrainingSummary):
"""
Abstraction for FMClassifier Training results.
.. versionadded:: 3.1.0
"""
pass


if __name__ == "__main__":
import doctest
Expand Down
Loading