Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Clustering Coefficient algorithm #15

Merged
merged 3 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions example/src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -1,14 +0,0 @@
id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13
1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10
2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10
3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10
4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10
5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10
6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10
7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10
8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10
9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10
10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10
-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10
-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10
-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10
10 changes: 0 additions & 10 deletions example/src/main/resources/edge

This file was deleted.

Empty file.
10 changes: 0 additions & 10 deletions example/src/main/resources/vertex

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package com.vesoft.nebula.algorithm

object ClusteringCoefficientExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object DegreeStaticExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object GraphTriangleCountExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object LpaExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object PageRankExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object ReadData {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object SCCExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object ShortestPathExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object TriangleCountExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object WCCExample {}
9 changes: 8 additions & 1 deletion nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,18 @@
# graphTriangleCount parameter
graphtrianglecount:{}

# Betweenness centrality parameter
# Betweenness centrality parameter. maxIter parameter means the max times of iterations.
betweenness:{
maxIter:5
}

# Clustering Coefficient parameter. The type parameter has two choice, local or global
# local type will compute the clustering coefficient for each vertex, and print the average coefficient for graph.
# global type just compute the graph's clustering coefficient.
clusteringcoefficient:{
type: local
}

# SingleSourceShortestPathAlgo parameter
singlesourceshortestpath:{
sourceid:"1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.vesoft.nebula.algorithm.config.{
AlgoConfig,
BetweennessConfig,
CcConfig,
CoefficientConfig,
Configs,
HanpConfig,
KCoreConfig,
Expand All @@ -22,6 +23,7 @@ import com.vesoft.nebula.algorithm.config.{
}
import com.vesoft.nebula.algorithm.lib.{
BetweennessCentralityAlgo,
ClusteringCoefficientAlgo,
ClosenessAlgo,
ConnectedComponentsAlgo,
DegreeStaticAlgo,
Expand Down Expand Up @@ -168,6 +170,10 @@ object Main {
case "graphtrianglecount" => {
GraphTriangleCountAlgo(spark, dataSet)
}
case "clusteringcoefficient" => {
val coefficientConfig = CoefficientConfig.getCoefficientConfig(configs)
ClusteringCoefficientAlgo(spark, dataSet, coefficientConfig)
}
case "closeness" => {
ClosenessAlgo(spark, dataSet, hasWeight)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ object BetweennessConfig {
}
}

/**
* ClusterCoefficient
* algoType has two options: local or global
*/
case class CoefficientConfig(algoType: String)

object CoefficientConfig {
var algoType: String = _

def getCoefficientConfig(configs: Configs): CoefficientConfig = {
val coefficientConfig = configs.algorithmConfig.map
algoType = coefficientConfig("algorithm.clustercoefficient.type")
assert(algoType.equalsIgnoreCase("local") || algoType.equalsIgnoreCase("global"),
"ClusterCoefficient only support local or global type.")
CoefficientConfig(algoType)
}
}

/**
* Hanp
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,21 @@ object Configs {
}

object AlgoConstants {
val ALGO_ID_COL: String = "_id"
val PAGERANK_RESULT_COL: String = "pagerank"
val LOUVAIN_RESULT_COL: String = "louvain"
val KCORE_RESULT_COL: String = "kcore"
val LPA_RESULT_COL: String = "lpa"
val CC_RESULT_COL: String = "cc"
val SCC_RESULT_COL: String = "scc"
val BETWEENNESS_RESULT_COL: String = "betweennedss"
val SHORTPATH_RESULT_COL: String = "shortestpath"
val DEGREE_RESULT_COL: String = "degree"
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
val CLOSENESS_RESULT_COL: String = "closeness"
val HANP_RESULT_COL: String = "hanp"
val NODE2VEC_RESULT_COL: String = "node2vec"
val ALGO_ID_COL: String = "_id"
val PAGERANK_RESULT_COL: String = "pagerank"
val LOUVAIN_RESULT_COL: String = "louvain"
val KCORE_RESULT_COL: String = "kcore"
val LPA_RESULT_COL: String = "lpa"
val CC_RESULT_COL: String = "cc"
val SCC_RESULT_COL: String = "scc"
val BETWEENNESS_RESULT_COL: String = "betweennedss"
val SHORTPATH_RESULT_COL: String = "shortestpath"
val DEGREE_RESULT_COL: String = "degree"
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
val CLUSTERCOEFFICIENT_RESULT_COL: String = "clustercoefficient"
val CLOSENESS_RESULT_COL: String = "closeness"
val HANP_RESULT_COL: String = "hanp"
val NODE2VEC_RESULT_COL: String = "node2vec"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.{AlgoConstants, CoefficientConfig, KCoreConfig}
import com.vesoft.nebula.algorithm.utils.NebulaUtil
import org.apache.log4j.Logger
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ClusteringCoefficientAlgo {
private val LOGGER = Logger.getLogger(this.getClass)

val ALGORITHM: String = "ClusterCoefficientAlgo"

/**
* run the clusterCoefficient algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
coefficientConfig: CoefficientConfig): DataFrame = {

val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false)
var algoResult: DataFrame = null

if (coefficientConfig.algoType.equalsIgnoreCase("local")) {
// compute local clustering coefficient
val localClusterCoefficient = executeLocalCC(graph)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
StructField(AlgoConstants.CLUSTERCOEFFICIENT_RESULT_COL, DoubleType, nullable = true)
))
algoResult = spark.sqlContext.createDataFrame(localClusterCoefficient, schema)
// print the graph's average clustering coefficient

import spark.implicits._
val vertexNum = algoResult.count()

val averageCoeff: Double =
if (vertexNum == 0) 0
else
algoResult.map(row => row.get(1).toString.toDouble).reduce(_ + _) / algoResult.count()
LOGGER.info(s"graph's average clustering coefficient is ${averageCoeff}")

} else {
// compute global clustering coefficient
val GlobalClusterCoefficient: Double = executeGlobalCC(graph)
val list = List(GlobalClusterCoefficient)
val rdd = spark.sparkContext.parallelize(list).map(row => Row(row))

val schema = StructType(
List(
StructField("globalClusterCoefficient", DoubleType, nullable = false)
))
algoResult = spark.sqlContext.createDataFrame(rdd, schema)
}
algoResult
}

/**
* execute local cluster coefficient
*/
def executeLocalCC(graph: Graph[None.type, Double]): RDD[Row] = {
// compute the actual triangle count for each vertex
val triangleNum = graph.triangleCount().vertices
// compute the open triangle count for each vertex
val idealTriangleNum = graph.degrees.mapValues(degree => degree * (degree - 1) / 2)
val result = triangleNum
.innerJoin(idealTriangleNum) { (vid, actualCount, idealCount) =>
{
if (idealCount == 0) 0.0
else (actualCount / idealCount * 1.0).formatted("%.6f").toDouble
}
}
.map(vertex => Row(vertex._1, vertex._2))

result
}

/**
* execute global cluster coefficient
*/
def executeGlobalCC(graph: Graph[None.type, Double]): Double = {
// compute the number of closed triangle
val closedTriangleNum = graph.triangleCount().vertices.map(_._2).reduce(_ + _)
// compute the number of open triangle and closed triangle (According to C(n,2)=n*(n-1)/2)
val triangleNum = graph.degrees.map(vertex => (vertex._2 * (vertex._2 - 1)) / 2.0).reduce(_ + _)
if (triangleNum == 0)
0.0
else
(closedTriangleNum / triangleNum * 1.0).formatted("%.6f").toDouble
}
}