Skip to content

Commit 35aa244

Browse files
techaddictrxin
authored andcommitted
SPARK-1668: Add implicit preference as an option to examples/MovieLensALS
Add --implicitPrefs as an command-line option to the example app MovieLensALS under examples/ Author: Sandeep <sandeep@techaddict.me> Closes #597 from techaddict/SPARK-1668 and squashes the following commits: 8b371dc [Sandeep] Second Pass on reviews by mengxr eca9d37 [Sandeep] based on mengxr's suggestions 937e54c [Sandeep] Changes 5149d40 [Sandeep] Changes based on review 1dd7657 [Sandeep] use mean() 42444d7 [Sandeep] Based on Suggestions by mengxr e3082fa [Sandeep] SPARK-1668: Add implicit preference as an option to examples/MovieLensALS Add --implicitPrefs as an command-line option to the example app MovieLensALS under examples/ (cherry picked from commit 108c4c1) Signed-off-by: Reynold Xin <rxin@apache.org>
1 parent c7b2704 commit 35aa244

File tree

1 file changed

+46
-9
lines changed

1 file changed

+46
-9
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ object MovieLensALS {
4343
kryo: Boolean = false,
4444
numIterations: Int = 20,
4545
lambda: Double = 1.0,
46-
rank: Int = 10)
46+
rank: Int = 10,
47+
implicitPrefs: Boolean = false)
4748

4849
def main(args: Array[String]) {
4950
val defaultParams = Params()
@@ -62,6 +63,9 @@ object MovieLensALS {
6263
opt[Unit]("kryo")
6364
.text(s"use Kryo serialization")
6465
.action((_, c) => c.copy(kryo = true))
66+
opt[Unit]("implicitPrefs")
67+
.text("use implicit preference")
68+
.action((_, c) => c.copy(implicitPrefs = true))
6569
arg[String]("<input>")
6670
.required()
6771
.text("input paths to a MovieLens dataset of ratings")
@@ -88,7 +92,25 @@ object MovieLensALS {
8892

8993
val ratings = sc.textFile(params.input).map { line =>
9094
val fields = line.split("::")
91-
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
95+
if (params.implicitPrefs) {
96+
/*
97+
* MovieLens ratings are on a scale of 1-5:
98+
* 5: Must see
99+
* 4: Will enjoy
100+
* 3: It's okay
101+
* 2: Fairly bad
102+
* 1: Awful
103+
* So we should not recommend a movie if the predicted rating is less than 3.
104+
* To map ratings to confidence scores, we use
105+
* 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved
106+
* entries are generally between It's okay and Fairly bad.
107+
* The semantics of 0 in this expanded world of non-positive weights
108+
* are "the same as never having interacted at all".
109+
*/
110+
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
111+
} else {
112+
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
113+
}
92114
}.cache()
93115

94116
val numRatings = ratings.count()
@@ -99,7 +121,18 @@ object MovieLensALS {
99121

100122
val splits = ratings.randomSplit(Array(0.8, 0.2))
101123
val training = splits(0).cache()
102-
val test = splits(1).cache()
124+
val test = if (params.implicitPrefs) {
125+
/*
126+
* 0 means "don't know" and positive values mean "confident that the prediction should be 1".
127+
* Negative values means "confident that the prediction should be 0".
128+
* We have in this case used some kind of weighted RMSE. The weight is the absolute value of
129+
* the confidence. The error is the difference between prediction and either 1 or 0,
130+
* depending on whether r is positive or negative.
131+
*/
132+
splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))
133+
} else {
134+
splits(1)
135+
}.cache()
103136

104137
val numTraining = training.count()
105138
val numTest = test.count()
@@ -111,21 +144,25 @@ object MovieLensALS {
111144
.setRank(params.rank)
112145
.setIterations(params.numIterations)
113146
.setLambda(params.lambda)
147+
.setImplicitPrefs(params.implicitPrefs)
114148
.run(training)
115149

116-
val rmse = computeRmse(model, test, numTest)
150+
val rmse = computeRmse(model, test, params.implicitPrefs)
117151

118152
println(s"Test RMSE = $rmse.")
119153

120154
sc.stop()
121155
}
122156

123157
/** Compute RMSE (Root Mean Squared Error). */
124-
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {
158+
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean) = {
159+
160+
def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
161+
125162
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
126-
val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
127-
.join(data.map(x => ((x.user, x.product), x.rating)))
128-
.values
129-
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
163+
val predictionsAndRatings = predictions.map{ x =>
164+
((x.user, x.product), mapPredictedRating(x.rating))
165+
}.join(data.map(x => ((x.user, x.product), x.rating))).values
166+
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
130167
}
131168
}

0 commit comments

Comments
 (0)