需求
- 读取信息,统计数据条数. 职业数, 电影数, 用户数, 评分条数
- 显示每个职业下的用户详细信息,显示为 (职业编号,(人的编号,性别,年龄,邮编),职业名)
职业数:21
电影数:3883
用户数:6040
评分条数:1000209
用户详情( 格式:(职业编号,(人的编号,性别,年龄,邮编),职业名)):
(4,((25,M,18,01609),college/grad student))
(4,((38,F,18,02215),college/grad student))
(4,((39,M,18,61820),college/grad student))
(4,((41,F,18,15116),college/grad student))
(4,((47,M,18,94305),college/grad student))
(4,((48,M,25,92107),college/grad student))
(4,((52,M,18,72212),college/grad student))
(4,((63,M,18,54902),college/grad student))
(4,((68,M,18,53706),college/grad student))
(4,((70,M,18,53703),college/grad student))
合并后共有:6040条users记录
参考实现
package com.movie
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object MovieDemo01 {
def main(args: Array[String]): Unit = {
/**
* 电影点评系统用户行为分析:用户观看电影和点评电影的所有行为数据的采集、过滤、处理和展示:
* 数据描述:
* 1,"ratings.dat":UserID::MovieID::Rating::Timestamp
* 2,"users.dat":UserID::Gender::Age::OccupationID::Zip-code
* 3,"movies.dat":MovieID::Title::Genres
* 4, "occupations.dat":OccupationID::OccupationName 一般情况下都会以程序中数据结构Haskset的方式存在,是为了做mapjoin
*/
// 1. 读取信息,统计数据条数. 职业数, 电影数, 用户数, 评分条数
// 2. 显示每个职业下的用户详细信息 显示为: (职业编号,(人的编号,性别,年龄,邮编),职业名)
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
// spark conf
val masterUrl = "local[*]"
val appName = "movie analysis"
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val sc = new SparkContext(conf)
val filepath = "src/main/java/com/movie/data/"
val usersRDD = sc.textFile(filepath + "users.dat")
val occupationsRDD = sc.textFile(filepath + "occupations.dat")
val ratingsRDD = sc.textFile(filepath + "ratings.dat")
val moviesRDD = sc.textFile(filepath + "movies.dat")
occupationsRDD.cache()
usersRDD.cache()
ratingsRDD.cache()
moviesRDD.cache()
println("职业数:" + occupationsRDD.count())
println("电影数:" + moviesRDD.count())
println("用户数:" + usersRDD.count())
println("评分条数:" + ratingsRDD.count)
// UserID,Gender,Age,OccupationID,Zip-code
// 显示为: (职业编号,(人的编号,性别,年龄,邮编),职业名)
val usersBasic = usersRDD.map(x => x.split("::"))
// (OccupationID,(Gender,UserID,Gender,Age,Zip-code))
.map(user => (user(3), (user(0), user(1), user(2), user(4))))
// OccupationID,OccupationName
val occupations = occupationsRDD.map(_.split("::"))
.map(occupation => {
// (OccupationID,OccupationName)
(occupation(0), occupation(1))
})
// 合并
val usersInfo = usersBasic.join(occupations)
println("用户详情( 格式:(职业编号,(人的编号,性别,年龄,邮编),职业名)):")
usersInfo.take(10).foreach(println)
println("合并后共有:" + usersInfo.count() + "条users记录")
sc.stop()
}
}