Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support k step neighbors parallel for start vids #79

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 0 additions & 127 deletions example/src/main/scala/com/vesoft/nebula/algorithm/DeepQueryTest.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import com.vesoft.nebula.algorithm.config.{
HanpConfig,
JaccardConfig,
KCoreConfig,
KNeighborsConfig,
KNeighborsParallelConfig,
LPAConfig,
LouvainConfig,
Node2vecConfig,
Expand All @@ -36,6 +38,8 @@ import com.vesoft.nebula.algorithm.lib.{
HanpAlgo,
JaccardAlgo,
KCoreAlgo,
KStepNeighbors,
KStepNeighborsParallel,
LabelPropagationAlgo,
LouvainAlgo,
Node2vecAlgo,
Expand Down Expand Up @@ -83,7 +87,9 @@ object Main {

val startTime = System.currentTimeMillis()
// reader
val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum)
val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum)
dataSet.cache()
dataSet.count()
val readTime = System.currentTimeMillis()

// algorithm
Expand Down Expand Up @@ -218,13 +224,25 @@ object Main {
val jaccardConfig = JaccardConfig.getJaccardConfig(configs)
JaccardAlgo(spark, dataSet, jaccardConfig)
}
case "kneighbors" => {
val kNeighborsConfig = KNeighborsConfig.getKNeighborConfig(configs)
KStepNeighbors(spark, dataSet, kNeighborsConfig)
}
case "keignborsparallel" => {
val kNeighborsParallelConfig =
KNeighborsParallelConfig.getKNeighborParallelConfig(configs)
KStepNeighborsParallel(spark, dataSet, kNeighborsParallelConfig)
}
case _ => throw new UnknownParameterException("unknown executeAlgo name.")
}
}
algoResult
}

private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = {
if (algoResult == null) {
return
}
val dataSink = configs.dataSourceSinkEntry.sink
dataSink.toLowerCase match {
case "nebula" => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,46 @@ object JaccardConfig {
}
}

/**
* k-step neighbors
*/
case class KNeighborsConfig(steps: List[Int], startId: Long)

object KNeighborsConfig {
var steps: List[Int] = _
var startId: Long = _

def getKNeighborConfig(configs: Configs): KNeighborsConfig = {
val kNeighborConfig = configs.algorithmConfig.map
steps = kNeighborConfig("algorithm.kneighbors.steps").toString.split(",").map(_.toInt).toList
startId = kNeighborConfig("algorithm.kneighbors.startId").toInt
KNeighborsConfig(steps, startId)
}
}

/**
* k-step neighbors for multi ids
*/
case class KNeighborsParallelConfig(steps: List[Int], startIds: List[Long])

object KNeighborsParallelConfig {
var steps: List[Int] = _
var startIds: List[Long] = _

def getKNeighborParallelConfig(configs: Configs): KNeighborsParallelConfig = {
val kNeighborParallelConfig = configs.algorithmConfig.map
steps = kNeighborParallelConfig("algorithm.keignborsparallel.steps").toString
.split(",")
.map(_.toInt)
.toList
startIds = kNeighborParallelConfig("algorithm.keignborsparallel.startIds").toString
.split(",")
.map(_.toLong)
.toList
KNeighborsParallelConfig(steps, startIds)
}
}

case class AlgoConfig(configs: Configs)

object AlgoConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.{AlgoConstants, KCoreConfig, KNeighborsConfig}
import com.vesoft.nebula.algorithm.lib.KCoreAlgo.execute
import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
import org.apache.log4j.Logger
import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, Pregel, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

object KStepNeighbors {
private val LOGGER = Logger.getLogger(this.getClass)

val ALGORITHM: String = "KStepNeighbors"

def apply(spark: SparkSession,
dataset: Dataset[Row],
kStepConfig: KNeighborsConfig): DataFrame = {
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false)
graph.persist()
graph.numVertices
graph.numEdges
dataset.unpersist(blocking = false)

execute(graph, kStepConfig.steps, kStepConfig.startId)
null
}

def execute(graph: Graph[None.type, Double], steps: List[Int], startId: Long): Unit = {
val queryGraph = graph.mapVertices { case (vid, _) => vid == startId }

val initialMessage = false
def sendMessage(edge: EdgeTriplet[Boolean, Double]): Iterator[(VertexId, Boolean)] = {
if (edge.srcAttr && !edge.dstAttr)
Iterator((edge.dstId, true))
else if (edge.dstAttr && !edge.srcAttr)
Iterator((edge.srcId, true))
else
Iterator.empty
}

val costs: ArrayBuffer[Long] = new ArrayBuffer[Long](steps.size)
val counts: ArrayBuffer[Long] = new ArrayBuffer[Long](steps.size)

for (iter <- steps) {
LOGGER.info(s">>>>>>>>>>>>>>> query ${iter} steps for $startId >>>>>>>>>>>>>>> ")
val startQuery = System.currentTimeMillis()
val pregelGraph = Pregel(queryGraph, initialMessage, iter, EdgeDirection.Either)(
vprog = (id, attr, msg) => attr | msg,
sendMsg = sendMessage,
mergeMsg = (a, b) => a | b
)
val endQuery = System.currentTimeMillis()
val num = pregelGraph.vertices.filter(row => row._2).count()
costs.append(endQuery - startQuery)
counts.append(num)
}

val timeCosts = costs.toArray
val neighborNum = counts.toArray
for (i <- 1 to steps.size) {
print(s"query ${steps(i - 1)} step neighbors cost: ${timeCosts(i - 1)}, ")
println(s"neighbor number is : ${neighborNum(i - 1)} ")
}
}
}
Loading