-
Notifications
You must be signed in to change notification settings - Fork 0
/
NLayerNetwork.scala
96 lines (79 loc) · 2.56 KB
/
NLayerNetwork.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Created by mac on 3/25/16.
*/
import model.{Person, Link}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object NLayerNetwork {
// def subgraphWithVertexes( graph:Graph[Person,Link], set:Set[Long] ) : Graph[Person,Link]={
// graph.subgraph(vpred = (id,attr) => set.contains(id))
// }
/**
* 计算N层邻居,返回N层邻居所构成的子图
* find the n-layer neighbours of specified VertexId
*
* @param n the number of layers
* @param srcId the specified VertexId
* @return the set of n-layer neighbors
*/
def nLayerNetwork(graph:Graph[Person,Link], n:Int, srcId:Long): Graph[Person,Link]={
/**
* neighbours' vertexId of each vertexId
*/
val neighbors:RDD[(VertexId,Array[VertexId])] =
graph.collectNeighborIds(EdgeDirection.Either) //VertexId是Long的别名
neighbors.cache()
var temp = Array[Long](srcId)
val tempSet = mutable.Set[Long]()
val result = mutable.Set[Long]()//最后的结果集
val set = Set[Long]()
var m = 0
while(m<n){
for( i <- 0 until temp.size ) {
val idsArray = neighbors.lookup(temp(i))(0)
tempSet ++= idsArray.toSet
}
result ++= tempSet
tempSet --= temp
temp = tempSet.toArray
tempSet.clear()
m=m+1
}
graph.subgraph(vpred = (id,attr) => result.contains(id))
}
def nLayerNetworkWithDebug(graph:Graph[Person,Link], n:Int, srcId:Long): Graph[Person,Link]={
/**
* neighbours' vertexId of each vertexId
*/
val neighbors:RDD[(VertexId,Array[VertexId])] =
graph.collectNeighborIds(EdgeDirection.Either) //VertexId是Long的别名
neighbors.cache()
var temp = Array[Long](srcId);
val tempSet = mutable.Set[Long]()
val result = mutable.Set[Long]()//最后的结果集
val set = Set[Long]()
var m = 0
while(m<n){
// print("temp:")
// temp.foreach(x => print(x+" "))
// println("temp.size : " + temp.size )
for( i <- 0 until temp.size ) {
//取出temp(i)对应的唯一的Array[VertexId]
val idsArray = neighbors.lookup(temp(i))(0)
//把Array中的值全部加入tempSet
tempSet ++= idsArray.toSet
}
// println("tempSet:" + tempSet)
result ++= tempSet
// println("result:"+result)
// println()
//去重后,赋值给新一轮的temp
tempSet --= temp
temp = tempSet.toArray
tempSet.clear()
m=m+1
}
graph.subgraph(vpred = (id,attr) => result.contains(id))
}
}