需求
- 某个用户看过的电影数量
- 这些电影的信息,格式为: (MovieId,Title,Genres)
20 观看过的电影数: 24
这些电影的详情:
(1375,20,Star Trek III: The Search for Spock (1984),Action|Adventure|Sci-Fi)
(1694,20,Apostle, The (1997),Drama)
(1371,20,Star Trek: The Motion Picture (1979),Action|Adventure|Sci-Fi)
(3863,20,Cell, The (2000),Sci-Fi|Thriller)
(3753,20,Patriot, The (2000),Action|Drama|War)
(3717,20,Gone in 60 Seconds (2000),Action|Crime)
(648,20,Mission: Impossible (1996),Action|Adventure|Mystery)
(457,20,Fugitive, The (1993),Action|Thriller)
(110,20,Braveheart (1995),Action|Drama|War)
(589,20,Terminator 2: Judgment Day (1991),Action|Sci-Fi|Thriller)
参考实现
package com.movie
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object MovieDemo02 {
def main(args: Array[String]): Unit = {
// 原始数据说明
/**
* 电影点评系统用户行为分析:用户观看电影和点评电影的所有行为数据的采集、过滤、处理和展示:
* 数据描述:
* 1,"ratings.dat":UserID::MovieID::Rating::Timestamp
* 2,"users.dat":UserID::Gender::Age::OccupationID::Zip-code
* 3,"movies.dat":MovieID::Title::Genres
* 4, "occupations.dat":OccupationID::OccupationName 一般情况下都会以程序中数据结构Haskset的方式存在,是为了做mapjoin
*/
// 某个用户看过的电影数量
// 这些电影的信息,格式为: (MovieId,Title,Genres)
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
// spark conf
var masterUrl = "local[*]"
var appName = "movie analysis"
if (args.length > 0) {
masterUrl = args(0)
} else if (args.length > 1) {
appName = args(1)
}
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val sc = new SparkContext(conf)
// 注意更改此项的文件路径
val filepath = "src/main/java/com/movie/data/"
val usersRDD = sc.textFile(filepath + "users.dat")
val occupationsRDD = sc.textFile(filepath + "occupations.dat")
val ratingsRDD = sc.textFile(filepath + "ratings.dat")
val moviesRDD = sc.textFile(filepath + "movies.dat")
occupationsRDD.cache()
usersRDD.cache()
ratingsRDD.cache()
moviesRDD.cache()
val userId = "20"
// "ratings.dat":UserID::MovieID::Rating::Timestamp
// 方式一
// val userWatchedMovie = usersRDD.map(x => x.split("::")).filter(
// item => item(0).equals(userId)
// )
// println(userId + "观看过的电影数: " + userWatchedMovie.count())
// 方式二
// val userWatchMovie = ratingsRDD.map(x => x.split("::"))
// .map(user => (user(1), user(0)))
// .filter(_._2.equals(userId))
// 157
val userWatchMovie = ratingsRDD.map(x => x.split("::"))
// (MovieID,UserId)
.map(user => (user(1), user(0)))
.filter(_._2.equals(userId))
// userid count
// 1 53
// 20 24
// 18 305
//val data = userWatchMovie.lookup(userId)
println(userId + " \n 观看过的电影数: \n" + userWatchMovie.count())
//
// println(userId + " 观看过的电影数: " + userWatchMovie.count())
println("这些电影的详情: \n")
// MovieID::Title::Genres
val movieInfoRDD = moviesRDD.map(_.split("::")) // [MovieID,Title,Genres]
.map(movie => {
(movie(0), (movie(1), movie(2)))
}) // ( MovieId, (Title,Genres) )
val result = userWatchMovie.join(movieInfoRDD) // (MovieId,(UserId,(Title,Genres) ))
.map(item => {
(item._1, item._2._1, item._2._2._1, item._2._2._2)
})
result.take(10).foreach(println)
sc.stop()
}
}