- 查询任务,获取任务参数
- 查询指定日期范围内的用户访问行为数据
- 基于session粒度的页面切片生成
- 每个session单跳页面切片生成
- 页面流匹配算法
- 单跳页面切片生成及页面流匹配
- 计算页面切片转化率
long taskid = ParamUtils.getTaskIdFromArgs(args,
Constants.SPARK_LOCAL_TASKID_SESSION);
ITaskDAO taskDAO = DaoFactory.getTaskDAO();
Task task = taskDAO.findTaskById(taskid);
if(task == null) {
System.out.println(new Date() + ":cannot find this task with id "+taskid);
return;
}
JSONObject taskParam = JSONObject.parseObject(task.taskParam);
JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDataRange(sqlContext, taskParam);
public static JavaRDD<Row> getActionRDDByDataRange(SQLContext sqlContext,
JSONObject taskParam) {
String startDate = ParamUtils.getParam(taskParam,
Constants.PARAM_START_DATE);
String endDate = ParamUtils.getParam(taskParam,
Constants.PARAM_END_DATE);
String sql = "select * from user_visit_action where date>='"
+startDate+"' and date<='"+endDate+"'";
DataFrame actionDF = sqlContext.sql(sql);
System.out.println("startDate:"+startDate+" endDate:"+
endDate+"actionDF:"+actionDF.count());
return actionDF.javaRDD();
}
- 每个session单跳页面切片生成
- 页面流匹配算法
//页面切片的生成是基于session粒度的
JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
sessionid2actionRDD =
sessionid2actionRDD.cache();
//对<session,访问行为> RDD,做一次groupByKey 操作
//因为要拿到每个session对应的访问行为数据,才能去生成切片
JavaPairRDD<String,Iterable<Row>> sessionid2actionsRDD = sessionid2actionRDD.groupByKey();
public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {
return actionRDD.mapPartitionsToPair(iterator -> {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String,Row>>();
while (iterator.hasNext()) {
Row row = iterator.next();
list.add(new Tuple2<String, Row>(row.getString(2), row));
}
return list;
});
}
//每个session的单跳页面切片生成,以及页面流的匹配算法
JavaPairRDD<String, Integer> pageSplitRDD = generateAndMatchPageSplit(sc,sessionid2actionsRDD,taskParam);
Map<String, Object> pageSplitPvMap = pageSplitRDD.countByKey();
private static JavaPairRDD<String, Integer> generateAndMatchPageSplit(
JavaSparkContext sc, JavaPairRDD<String, Iterable<Row>> sessionid2actionsRDD,
JSONObject taskParam) {
String targetPageFlow = ParamUtils.getParam(taskParam,
Constants.PARAM_CATEGORY_IDS);
final Broadcast<String> targetPageFlowBroadCast = sc.broadcast(targetPageFlow);
return sessionid2actionsRDD.flatMapToPair(tuple -> {
Iterator<Row> iterator = tuple._2.iterator();
List<Tuple2<String, Integer>> list = new ArrayList<>();
//获取指定的页面流,1,2,3,4,5,6,7
String[] targetPages = targetPageFlowBroadCast.value().split(",");
//我们拿到的session访问行为默认情况下是乱序的,所以需要对session访问行为按照时间排序
//3,5,4,10,7 -> 3,4,5,7,10
List<Row> rows = new ArrayList<Row>();
while(iterator.hasNext()) {
rows.add(iterator.next());
}
//按时间排序
Collections.sort(rows, new Comparator<Row>() {
@Override
public int compare(Row o1, Row o2) {
String actionTime1 = o1.getString(4);
String actionTime2 = o2.getString(4);
Date date1 = DateUtils.parseTime(actionTime1);
Date date2 = DateUtils.parseTime(actionTime2);
return date1.compareTo(date2);
}
});
Long lastPageId = null;
for(Row row: rows) {
long pageid = row.getLong(3);
if(lastPageId == null) {
lastPageId = pageid;
continue;
}
//生成一个页面切片
//3,,5,2,1,8,9
//lastPage=3, 5 切片 3_5
String pageSplit = lastPageId + "_" +pageid;
//判断该切片是否在用户指定的页面中
for(int i=1; i<targetPages.length; i++) {
String targetPageSplit = targetPages[i-1] + "_" + targetPages[i];
if(pageSplit.equals(targetPageSplit)) {
list.add(new Tuple2<String, Integer>(pageSplit,1));
}
}
lastPageId = pageid;
}
return list;
});
}
- 获取起始页面访问量
- 计算切片页面转化率
long startPagePv = getStartPagePv(taskParam,sessionid2actionsRDD);
Map<String,Double> pageSplitConvertRate = computePageSplitConvertRate(taskParam,pageSplitPvMap,startPagePv);
/**
* 获取页面中初始页面PV
* @param taskParam
* @param sessionid2actionsRDD
* @return
*/
private static long getStartPagePv(JSONObject taskParam, JavaPairRDD<String, Iterable<Row>> sessionid2actionsRDD) {
String targetPageFlowStr = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW);
final long startPageId = Long.valueOf(targetPageFlowStr.split(",")[0]);
JavaRDD<Long> startPageRDD = sessionid2actionsRDD.flatMap(tuple -> {
List<Long> list = new ArrayList<>();
Iterator<Row> iterator = tuple._2.iterator();
while(iterator.hasNext()) {
Row row = iterator.next();
long pageid = row.getLong(3);
if(pageid == startPageId) {
list.add(pageid);
}
}
return list;
});
return startPageRDD.count();
}
/**
* 计算页面切片转化率
* @param taskParam
* @param pageSplitPvMap
* @param startPagePv
* @return
*/
private static Map<String, Double> computePageSplitConvertRate(JSONObject taskParam,
Map<String, Object> pageSplitPvMap, long startPagePv) {
Map<String, Double> convertRateMap = new HashMap<String, Double>();
String[] targetPages = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW).split(",");
long lastPageSplitPv = 0L;
for(int i=1; i<targetPages.length; i++) {
String targetPageSplit = targetPages[i-1] + "_" +targetPages[i];
long targetPageSplitPv = Long.valueOf(String.valueOf(pageSplitPvMap.get(targetPageSplit)));
double convertRate = 0.0;
//第一个页面,转化率是 目标页面访问量 / 起始页面访问量
//3,5,2,4,6
//3_5: 3_5.pv / 3.pv
//5_2: 5_2.pv / 3_5.pv
if(i == 1) {
convertRate = NumberUtils.formatDouble((double)targetPageSplitPv / (double)startPagePv, 2);
} else {
convertRate = NumberUtils.formatDouble((double)targetPageSplitPv / (double)lastPageSplitPv, 2);
}
convertRateMap.put(targetPageSplit, convertRate);
lastPageSplitPv = targetPageSplitPv;
}
return convertRateMap;
}