Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0b3f053
jecos integrated as the default qpsolve in ALS; implicit tests are fa…
Jun 13, 2014
8ba4871
Qp options added to Spark ALS: unbounded Qp, Qp with pos, Qp with bou…
Jun 18, 2014
dd912db
Prepared branch of ALS-QP feature/runtime testing
Jun 19, 2014
6dd320b
QpProblem drivers in MovieLensALS
Jun 20, 2014
48023c8
debug option for octave quadprog validation
Jun 21, 2014
e7e64b7
L1 option added to ALS; Driver added to MovieLensALS
Jun 21, 2014
84f1d67
Qp with equality and bounds added to option 4 of ECOS based QpSolver
Jun 21, 2014
90bca10
Movielens runtime experiments for Spark Summit talk
Jun 30, 2014
4e2c623
ADMM based QuadraticMinimizer in mllib.optimization;Used in ALS
Jul 15, 2014
3f93ee5
Refactored to use com.github.ecos package
Jul 16, 2014
f288846
moved interior point based qp-als to feature/ipmqp-als branch; prepar…
Jul 18, 2014
21d7990
license cleanup; Copyright added to NOTICE
Aug 2, 2014
13cb89b
Merge with HEAD
Aug 5, 2014
a12d92a
BSD license for Proximal algorithms
Aug 7, 2014
02199a8
LICENSE and NOTICE updates as per Legal
Aug 8, 2014
f43ed66
Merge with master
Aug 9, 2014
c03dbed
Merge branch 'feature/qp-als' of https://istg.vzvisp.com:8443/stash/s…
Aug 13, 2014
c9d1fbf
Redesign of ALS API; userConstraint and productConstraint separated;
Oct 8, 2014
f2cab3e
delimiter added to MovieLensALS example;rho tuning in QuadraticMinimi…
Oct 29, 2014
b2c9dac
rho as sqrt(eigenMin*eigenMax) for sparse, sqrt(eigenMax) for other f…
Oct 30, 2014
c01f3e3
NNLS bug2
Oct 31, 2014
a941207
removed ecos dependency from mllib pom
Nov 1, 2014
9b3951f
validate user/product on MovieLens dataset through user input and com…
Nov 5, 2014
cd3ab31
merged with AbstractParams serialization bug
Nov 5, 2014
4bbae0f
comments fixed as per scalastyle
Nov 5, 2014
9fa063e
import scala.math.round
Nov 6, 2014
10cbb37
provide ratio for topN product validation; generate MAP and prec@k me…
Nov 8, 2014
f38a1b5
use sampleByKey for per user sampling
Nov 8, 2014
d144f57
recommendAll API to MatrixFactorizationModel, uses topK finding using…
Nov 12, 2014
1e7e36e
Updated qp-als with irmetrics for experiments
Nov 12, 2014
38b740e
Added Apache license on Constraint.scala
Nov 12, 2014
7163a5c
Added API for batch user and product recommendation; MAP calculation …
Nov 19, 2014
34fbf21
merge with irmetrics for experiments
Nov 20, 2014
10bf72d
PLSA formulation with least square loss converges better with rho fro…
Nov 23, 2014
1cbe0cc
merge with ml.ALS refactoring; comparisons with Breeze QuadraticMinim…
Mar 22, 2015
a386c3e
Added testcases for constraints; Updated ALS to run with userConstrai…
Mar 23, 2015
196d8c8
merged with master; added memory optimization for upper triangular gram
Mar 23, 2015
1848181
merged with breeze-0.11.2; added testcases for positivity, bounds, sp…
Mar 28, 2015
33b5a97
LICENSE and NOTICE cleaned as the code moved to Breeze
Mar 28, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ public static void main(String[] args) {
JavaRDD<String> lines = sc.textFile(args[0]);

JavaRDD<Rating> ratings = lines.map(new ParseRating());

MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks);


MatrixFactorizationModel model = new ALS().setRank(rank)
.setIterations(iterations)
.setBlocks(blocks)
.run(ratings.rdd());

model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
outputDir + "/userFeatures");
model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.examples.mllib

import scala.collection.mutable

import org.apache.log4j.{Level, Logger}
import scopt.OptionParser

import breeze.optimize.proximal.Constraint
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD

Expand All @@ -37,19 +35,25 @@ import org.apache.spark.rdd.RDD
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object MovieLensALS {

case class Params(
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]
input: String = null,
kryo: Boolean = false,
numIterations: Int = 20,
userConstraint: String = "SMOOTH",
productConstraint: String = "SMOOTH",
userLambda: Double = 1.0,
productLambda: Double = 1.0,
rank: Int = 10,
delimiter: String = "::",
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()

val userConstraints = Constraint.values.toList.mkString(",")
val productConstraints = Constraint.values.toList.mkString(",")

val parser = new OptionParser[Params]("MovieLensALS") {
head("MovieLensALS: an example app for ALS on MovieLens data.")
Expand All @@ -59,9 +63,21 @@ object MovieLensALS {
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("lambda")
.text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
.action((x, c) => c.copy(lambda = x))
opt[String]("userConstraint")
.text(s"user constraint options ${userConstraints} default: SMOOTH")
.action((x, c) => c.copy(userConstraint = x))
opt[String]("productConstraint")
.text(s"product constraint options ${productConstraints} default: SMOOTH")
.action((x, c) => c.copy(productConstraint = x))
opt[Double]("lambdaUser")
.text(s"lambda for user regularization, default: ${defaultParams.userLambda}")
.action((x, c) => c.copy(userLambda = x))
opt[Double]("lambdaProduct")
.text(s"lambda for product regularization, default: ${defaultParams.productLambda}")
.action((x, c) => c.copy(productLambda = x))
opt[String]("delimiter")
.text(s"sparse dataset delimiter, default: ${defaultParams.delimiter}")
.action((x, c) => c.copy(delimiter = x))
opt[Unit]("kryo")
.text("use Kryo serialization")
.action((_, c) => c.copy(kryo = true))
Expand All @@ -84,7 +100,8 @@ object MovieLensALS {
|
| bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \
| examples/target/scala-*/spark-examples-*.jar \
| --rank 5 --numIterations 20 --lambda 1.0 --kryo \
| --rank 5 --numIterations 20 --userConstraint SMOOTH --productConstraint SPARSE
| --userLambda 0.01 --productLambda 1.0 --kryo\
| data/mllib/sample_movielens_data.txt
""".stripMargin)
}
Expand All @@ -107,9 +124,10 @@ object MovieLensALS {
Logger.getRootLogger.setLevel(Level.WARN)

val implicitPrefs = params.implicitPrefs

val delimiter = params.delimiter

val ratings = sc.textFile(params.input).map { line =>
val fields = line.split("::")
val fields = line.split(delimiter)
if (implicitPrefs) {
/*
* MovieLens ratings are on a scale of 1-5:
Expand All @@ -136,9 +154,10 @@ object MovieLensALS {
val numMovies = ratings.map(_.product).distinct().count()

println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")

val splits = ratings.randomSplit(Array(0.8, 0.2))
val splits = ratings.randomSplit(Array(0.8, 0.2), 1L)
val training = splits(0).cache()

val test = if (params.implicitPrefs) {
/*
* 0 means "don't know" and positive values mean "confident that the prediction should be 1".
Expand All @@ -158,14 +177,23 @@ object MovieLensALS {

ratings.unpersist(blocking = false)

val model = new ALS()
val userConstraint = Constraint.withName(params.userConstraint)
val productConstraint = Constraint.withName(params.productConstraint)

val als = new ALS()
.setRank(params.rank)
.setIterations(params.numIterations)
.setLambda(params.lambda)
.setUserConstraint(userConstraint)
.setProductConstraint(productConstraint)
.setUserLambda(params.userLambda)
.setProductLambda(params.productLambda)
.setImplicitPrefs(params.implicitPrefs)
.setUserBlocks(params.numUserBlocks)
.setProductBlocks(params.numProductBlocks)
.run(training)

println(s"ALS with userConstraint ${userConstraint} productConstraint ${productConstraint}")

val model = als.run(training)

val rmse = computeRmse(model, test, params.implicitPrefs)

Expand All @@ -179,11 +207,11 @@ object MovieLensALS {
: Double = {

def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r

val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map{ x =>
val predictionsAndRatings = predictions.map { x =>
((x.user, x.product), mapPredictedRating(x.rating))
}.join(data.map(x => ((x.user, x.product), x.rating))).values

math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}
}
119 changes: 103 additions & 16 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ml.recommendation
import java.{util => ju}
import java.io.IOException

import breeze.optimize.proximal.{ProximalL1, QuadraticMinimizer}
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Sorting
Expand All @@ -43,6 +44,9 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter}
import org.apache.spark.util.random.XORShiftRandom
import breeze.optimize.proximal.Constraint._
import breeze.linalg.{DenseVector=>BrzVector}
import breeze.linalg.{DenseMatrix=>BrzMatrix}

/**
* Common params for ALS.
Expand Down Expand Up @@ -72,8 +76,7 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR
* Param for number of item blocks.
* @group param
*/
val numItemBlocks =
new IntParam(this, "numItemBlocks", "number of item blocks", Some(10))
val numItemBlocks = new IntParam(this, "numItemBlocks", "number of item blocks", Some(10))

/** @group getParam */
def getNumItemBlocks: Int = get(numItemBlocks)
Expand Down Expand Up @@ -289,11 +292,17 @@ class ALS extends Estimator[ALSModel] with ALSParams {
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}

val userRegParam = map(regParam)
val userConstraint = if(map(nonnegative)) POSITIVE else SMOOTH

val (userFactors, itemFactors) = ALS.train(ratings, rank = map(rank),
numUserBlocks = map(numUserBlocks), numItemBlocks = map(numItemBlocks),
maxIter = map(maxIter), regParam = map(regParam), implicitPrefs = map(implicitPrefs),
alpha = map(alpha), nonnegative = map(nonnegative),
checkpointInterval = map(checkpointInterval))
maxIter = map(maxIter),
userRegParam=userRegParam, itemRegParam=userRegParam,
implicitPrefs = map(implicitPrefs),
alpha = map(alpha),
userConstraint=userConstraint, itemConstraint=userConstraint)
val model = new ALSModel(this, map, map(rank), userFactors, itemFactors)
Params.inheritValues(map, this, model)
model
Expand Down Expand Up @@ -324,6 +333,58 @@ object ALS extends Logging {
def solve(ne: NormalEquation, lambda: Double): Array[Float]
}

/** QuadraticMinimization solver for least square problems. */
private[recommendation] class QuadraticSolver(rank: Int,
constraint: Constraint)
extends LeastSquaresNESolver {
private val qm = QuadraticMinimizer(rank, constraint)
private val init = qm.initialize
// Elastic Net beta parameter for L1 regularization
private val beta = if (constraint==SPARSE) 0.99 else 0.0

/** Quadratic Minimization solver for least square problems with non-smooth constraints (L1)
*
* minimize 0.5x'Hx + c'x + g(z)
* s.t Aeq x = beq
*
* Affine constraints are optional, Supported g(z) are one of the following
*
* 1. z >= 0
* 2. lb <= z <= ub
* 3. 1'z = s, s>=0
* 4. lambda*L1(z)
*
* TO DO: Add the remaining constraints
*
* @param ne a [[NormalEquation]] instance that contains AtA, Atb, and n (number of instances)
* @param lambda regularization constant, which will be scaled by n
* @return the solution x
*/
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = {
require(ne.k == rank, s"ALS:QuadraticSolver rank $rank expected ${ne.k}")

// If Elastic Net formulation is being run, give (1-beta)*lambda to L2 and
// beta*lambda to L1. The nomenclature used here is exactly same as GLMNET
val scaledlambda = lambda * (1- beta) * ne.n
var i = 0
var j = 2
while (i < ne.triK) {
ne.ata(i) += scaledlambda
i += j
j += 1
}
if (constraint == SPARSE) {
val regParamL1 = beta * lambda * ne.n
qm.getProximal.asInstanceOf[ProximalL1].setLambda(regParamL1)
}
val q = new BrzVector(ne.atb)
q *= -1.0
val x = qm.minimize(ne.ata, q, init)
ne.reset()
x.data.map(x => x.toFloat)
}
}

/** Cholesky solver for least square problems. */
private[recommendation] class CholeskySolver extends LeastSquaresNESolver {

Expand Down Expand Up @@ -496,10 +557,12 @@ object ALS extends Logging {
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
maxIter: Int = 10,
regParam: Double = 1.0,
userRegParam: Double = 1.0,
itemRegParam: Double = 1.0,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
nonnegative: Boolean = false,
userConstraint: Constraint = SMOOTH,
itemConstraint: Constraint = SMOOTH,
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
checkpointInterval: Int = 10,
Expand All @@ -512,7 +575,27 @@ object ALS extends Logging {
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
val solver = System.getenv("solver")

val userSolver =
if (userConstraint == POSITIVE) new NNLSSolver
else {
if (solver == "mllib") new CholeskySolver
else {
println(s"QuadraticSolver for users with constraint ${userConstraint.toString}")
new QuadraticSolver(rank, userConstraint)
}
}

val itemSolver =
if (itemConstraint == POSITIVE) new NNLSSolver
else {
if (solver == "mllib") new CholeskySolver
else {
println(s"QuadraticSolver for items with constraint ${itemConstraint.toString}")
new QuadraticSolver(rank, itemConstraint)
}
}
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val (userInBlocks, userOutBlocks) =
Expand Down Expand Up @@ -546,17 +629,17 @@ object ALS extends Logging {
for (iter <- 1 to maxIter) {
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
val previousItemFactors = itemFactors
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, implicitPrefs, alpha, solver)
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, userRegParam,
userLocalIndexEncoder, implicitPrefs, alpha, userSolver)
previousItemFactors.unpersist()
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
// TODO: Generalize PeriodicGraphCheckpointer and use it here.
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint() // itemFactors gets materialized in computeFactors.
}
val previousUserFactors = userFactors
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, itemRegParam,
itemLocalIndexEncoder, implicitPrefs, alpha, itemSolver)
if (shouldCheckpoint(iter)) {
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
Expand All @@ -565,16 +648,16 @@ object ALS extends Logging {
}
} else {
for (iter <- 0 until maxIter) {
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, solver = solver)
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, itemRegParam,
userLocalIndexEncoder, solver = itemSolver)
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint()
itemFactors.count() // checkpoint item factors and cut lineage
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, solver = solver)
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, userRegParam,
itemLocalIndexEncoder, solver = userSolver)
}
}
val userIdAndFactors = userInBlocks
Expand Down Expand Up @@ -1102,6 +1185,7 @@ object ALS extends Logging {
dstInBlocks.join(merged).mapValues {
case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors) =>
val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks)
var solveTime = 0.0
srcFactors.foreach { case (srcBlockId, factors) =>
sortedSrcFactors(srcBlockId) = factors
}
Expand All @@ -1127,9 +1211,12 @@ object ALS extends Logging {
}
i += 1
}
val startTime = System.nanoTime()
dstFactors(j) = solver.solve(ls, regParam)
solveTime += (System.nanoTime() - startTime)
j += 1
}
logInfo(s"solveTime ${solveTime/1e6} ms")
dstFactors
}
}
Expand Down
Loading