Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/mllib-collaborative-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ model = ALS.train(ratings, rank, numIterations)
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count()
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
Expand Down
39 changes: 39 additions & 0 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
>>> model.userFeatures().collect()
[(1, array('d', [...])), (2, array('d', [...]))]

>>> model.recommendUsers(1, 2)
[Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
>>> model.recommendProducts(1, 2)
[Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
>>> model.rank
4

>>> first_user = model.userFeatures().take(1)[0]
>>> latents = first_user[1]
>>> len(latents) == 4
Expand Down Expand Up @@ -105,21 +112,53 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
... pass
"""
def predict(self, user, product):
"""
Predicts rating for the given user and product.
"""
return self._java_model.predict(int(user), int(product))

def predictAll(self, user_product):
"""
Returns a list of predicted ratings for input user and product pairs.
"""
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
first = user_product.first()
assert len(first) == 2, "user_product should be RDD of (user, product)"
user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
return self.call("predict", user_product)

def userFeatures(self):
"""
Returns a paired RDD, where the first element is the user and the
second is an array of features corresponding to that user.
"""
return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))

def productFeatures(self):
"""
Returns a paired RDD, where the first element is the product and the
second is an array of features corresponding to that product.
"""
return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))

def recommendUsers(self, product, num):
"""
Recommends the top "num" number of users for a given product and returns a list
of Rating objects sorted by the predicted rating in descending order.
"""
return list(self.call("recommendUsers", product, num))

def recommendProducts(self, user, num):
"""
Recommends the top "num" number of products for a given user and returns a list
of Rating objects sorted by the predicted rating in descending order.
"""
return list(self.call("recommendProducts", user, num))

@property
def rank(self):
return self.call("rank")

@classmethod
def load(cls, sc, path):
model = cls._load_java(sc, path)
Expand Down