需求
- 分析不同职业对观看电影类型的影响 (职业名,(电影类型,观影次数))
(academic/educator,(2828,Children's))
(academic/educator,(2828,Comedy))
(academic/educator,(2828,Children's))
(academic/educator,(2828,Comedy))
(academic/educator,(2828,Children's))
(K-12 student,(Animation,2164))
(K-12 student,(Thriller,4212))
(K-12 student,(War,1385))
(K-12 student,(Horror,1905))
(K-12 student,(Documentary,98))
(K-12 student,(Comedy,9465))
(K-12 student,(Western,300))
(K-12 student,(Fantasy,1275))
(K-12 student,(Romance,2990))
(K-12 student,(Drama,6000))
参考实现
package com.movie
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object MovieDemo09 {
def main(args: Array[String]): Unit = {
/**
* 电影点评系统用户行为分析:用户观看电影和点评电影的所有行为数据的采集、过滤、处理和展示:
* 1,"ratings.dat":UserID::MovieID::Rating::Timestamp
* 2,"users.dat":UserID::Gender::Age::OccupationID::Zip-code
* 3,"movies.dat":MovieID::Title::Genres
* 4, "occupations.dat":OccupationID::OccupationName
*/
// 需求 9: 分析不同职业对观看电影类型的影响 (职业名,(电影类型,观影次数))
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
// spark conf
val masterUrl = "local[*]"
val appName = "movie analysis"
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val sc = new SparkContext(conf)
val filepath = "src/main/java/com/movie/data/"
val usersRDD = sc.textFile(filepath + "users.dat")
val occupationsRDD = sc.textFile(filepath + "occupations.dat")
val ratingsRDD = sc.textFile(filepath + "ratings.dat")
val moviesRDD = sc.textFile(filepath + "movies.dat")
// "movies.dat":MovieID::Title::Genres
// 需求 9: 分析不同职业对观看电影类型的影响 (职业名,(电影类型,观影次数))
moviesRDD.cache()
val user = usersRDD.map(_.split("::")) //UserID::Gender::Age::OccupationID::Zip-code
.map(x => (x(3), x(0))) // (OccupationID,UserID)
val rating = ratingsRDD.map(_.split("::"))
.map(x => (x(0), x(1))) // ( UserID, movieId)
val occ = occupationsRDD.map(_.split("::"))
.map(x => (x(0), x(1))) // ( OccupationID,OccupationName)
// 合并用户与职业
val uoRDD = user.join(occ) // (OccupationID,(UserID,OccupationName)
.map(item => (item._2._1, item._2._2)) // (UserID,OccupationName)
// 电影与电影类型
val moviesTypeRDD = moviesRDD.map(_.split("::"))
.map(x => (x(0), x(2))) // (编号,类型) => (1, Animation|Children's|Comedy ) =>
.flatMapValues(types => {
types.split("\\|")
}) // (编号,Animation), (编号,Children's), (编号,Comedy)
val rdd = uoRDD.join(rating) // (UserID,(OccupationName,MovieID))
.map(item => (item._2._2, item._2._1)) // (MovieID, OccupationName)
.join(moviesTypeRDD) // (MovieID, (OccupationName, Animation))
.map(item => (item._2._1, (item._1, item._2._2))) // (OccupationName, (MovieID, Animation))
rdd.take(5).foreach(println)
// (OccupationName,Iterable[(电影编号, Animation),(电影编号, Animation),(电影编号 Animation),(电影编号,Animation),(电影编号,Animation)]
val result = rdd.groupByKey()
.flatMapValues(array => {
var A: Map[String, Int] = Map()
array.foreach(item => {
if (A.contains(item._2)) {
val oldCount = A.getOrElse(item._2, 0) + 1
A += (item._2 -> oldCount)
} else {
A += (item._2 -> 1)
}
})
A
})
.sortByKey()
.collect()
result.take(10).foreach(println)
sc.stop()
}
}
package com.movie
import java.sql.DriverManager
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object MovieDemo10 {
def main(args: Array[String]): Unit = {
// 需求 10 : 需求 9 结果入库
//格式: (职业名,(电影类型,观影次数))
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
// spark conf
var masterUrl = "local[*]"
var appName = "movie analysis"
if (args.length > 0) {
masterUrl = args(0)
} else if (args.length > 1) {
appName = args(1)
}
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val sc = new SparkContext(conf)
val filepath = "src/main/java/com/movie/data/"
val usersRDD = sc.textFile(filepath + "users.dat")
val occupationsRDD = sc.textFile(filepath + "occupations.dat")
val ratingsRDD = sc.textFile(filepath + "ratings.dat")
val moviesRDD = sc.textFile(filepath + "movies.dat")
val occupation = occupationsRDD.map(x => x.split("::"))
.map(x => {
// OccupationID::OccupationName
(x(0), x(1))
})
val userInfo = usersRDD.map(x => x.split("::"))
.map(x => {
// OccupationID,UserID
(x(3), x(0))
})
// (职业名,(电影类型,观影次数))
// (OccupationID,(OccupationName,UserID))
// 用户与职业
val userData = occupation.join(userInfo)
.map(x => {
// (UserID,OccupationName)
(x._2._2, x._2._1)
})
// "movies.dat":MovieID::Title::Genres
// 电影与电影类型
val moviesTypeRDD = moviesRDD.map(_.split("::"))
.map(x => (x(0), x(2))) // (编号,类型) => (1, Animation|Children's|Comedy ) =>
.flatMapValues(types => {
types.split("\\|")
}) // (编号,Animation), (编号,Children's), (编号,Comedy)
// (职业名,(电影类型,观影次数))
// (UserID,MovieID)
val rating = ratingsRDD.map(_.split("::"))
.map(x => (x(0), x(1))) // (UserID,MovieID)
// userData (UserID,OccupationName)
// movieOccupation 电影类型与职业名称
// moviesTypeRDD // (编号,Animation), (编号,Children's), (编号,Comedy)
val allData = userData.join(rating) // (UserID,(OccupationName,MovieID))
.map(item => (item._2._2, item._2._1)) // (MovieID,OccupationName)
.join(moviesTypeRDD) // (MovieID,(OccupationName,Animation))
// (OccupationName,(MovieID,Animation))
.map(item => (item._2._1, (item._1, item._2._2)))
// allData.take(5).foreach(println)
// (OccupationName,Iterable[(1, Animation),(1, Animation),(1, Animation),(2,Animation),(2, Animation)]
val result = allData.groupByKey()
.flatMapValues(array => {
var A: Map[String, Int] = Map()
array.foreach(item => {
if (A.contains(item._2)) {
val oldCount = A.getOrElse(item._2, 0) + 1
A += (item._2 -> oldCount)
} else {
A += (item._2 -> 1)
}
})
A
})
result.mapPartitions(elements => {
val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "admin")
// 事务处理
// 隐式事务提交
con.setAutoCommit(false)
val pstmt = con.prepareStatement("insert into result( occ, mtype,nums) values(?,?,?)")
// 受影响的行数
var result = new ArrayBuffer[String]()
for (ele <- elements) {
pstmt.setString(1, ele._1)
pstmt.setString(2, ele._2._1)
pstmt.setInt(3, ele._2._2)
pstmt.addBatch() // 添加到批量任务中
result.+=(ele._1)
}
pstmt.executeBatch()
con.commit()
con.setAutoCommit(true)
con.close()
println("添加一个分区数据成功")
result.iterator
})
.foreach(println)
sc.stop()
}
}
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for result
-- ----------------------------
DROP TABLE IF EXISTS `result`;
CREATE TABLE `result` (
`occ` varchar(255) DEFAULT NULL,
`mtype` varchar(255) DEFAULT NULL,
`nums` bigint(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;