Skip to content

Commit

Permalink
Support running algos on subgraph (#56)
Browse files Browse the repository at this point in the history
* add nebula ngql reader example

* add config for nebula ngql

* fix nebula configuration file
  • Loading branch information
MeeCreeps authored Oct 13, 2022
1 parent 4c56fe5 commit 5016629
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 68 deletions.
53 changes: 39 additions & 14 deletions example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
object ReadData {

/**
* read edge data from local csv and apply clustering coefficient
* livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz
*
* The livejournal data is put to hdfs, the path is
* hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt
*/
* read edge data from local csv and apply clustering coefficient
* livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz
*
* The livejournal data is put to hdfs, the path is
* hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt
*/
def readLiveJournalData(spark: SparkSession): DataFrame = {
val df = spark.sparkContext.textFile(
"hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt")
Expand All @@ -30,14 +30,14 @@ object ReadData {

val schema = StructType(
List(StructField("src", StringType, nullable = false),
StructField("dst", StringType, nullable = true)))
StructField("dst", StringType, nullable = true)))
val edgeDF = spark.sqlContext.createDataFrame(dd, schema)
edgeDF
}

/**
* read edge data from csv
*/
* read edge data from csv
*/
def readCsvData(spark: SparkSession): DataFrame = {
val df = spark.read
.option("header", true)
Expand All @@ -47,9 +47,9 @@ object ReadData {
}

/**
* read edge data from csv
* the data has string type id
*/
* read edge data from csv
* the data has string type id
*/
def readStringCsvData(spark: SparkSession): DataFrame = {
val df = spark.read
.option("header", true)
Expand All @@ -59,8 +59,8 @@ object ReadData {
}

/**
* read edge data from Nebula
*/
* read edge data from Nebula
*/
def readNebulaData(spark: SparkSession): DataFrame = {
val config =
NebulaConnectionConfig
Expand All @@ -80,4 +80,29 @@ object ReadData {
val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
df
}

/**
* read edge data from Nebula by NGQL
*/
def readNebulaDataByNgql(spark: SparkSession): DataFrame = {
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("knows")
.withNoColumn(true)
.withLimit(2000)
.withNgql(" GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships ;")
.build()
val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDfByNgql()
df
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object TriangleCountExample {

// val csvDF = ReadData.readCsvData(spark)
// val nebulaDF = ReadData.readNebulaData(spark)
// val nebulaDFbyNgql = ReadData.readNebulaDataByNgql(spark)
val journalDF = ReadData.readLiveJournalData(spark)

graphTriangleCount(spark, journalDF)
Expand Down
21 changes: 20 additions & 1 deletion nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
}

data: {
# data source. optional of nebula,csv,json
# data source. optional of nebula,nebula-ngql,csv,json
source: csv
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
Expand Down Expand Up @@ -50,6 +50,25 @@
}
}

# Nebula Graph reader by ngql
nebula-ngql: {
# algo's data source from Nebula by ngql
read: {
# Nebula metad server address, multiple addresses are split by English comma,graphAddress must be set
metaAddress: "127.0.0.1:9559"
graphAddress: "127.0.0.1:9669"
# Nebula space
space: nb
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["serve"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: ["start_year"]
# ngql for subgraph , return with edge
ngql: "GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships;"
}
}

local: {
# algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
read:{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* This object is the entry of all graph algorithms.
*
* How to use this tool to run algorithm:
* 1. Configure application.conf file.
* 2. Make sure your environment has installed spark and started spark service.
* 3. Submit nebula algorithm application using this command:
* spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf
*/
* This object is the entry of all graph algorithms.
*
* How to use this tool to run algorithm:
* 1. Configure application.conf file.
* 2. Make sure your environment has installed spark and started spark service.
* 3. Submit nebula algorithm application using this command:
* spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf
*/
object Main {

private val LOGGER = Logger.getLogger(this.getClass)
Expand Down Expand Up @@ -102,12 +102,12 @@ object Main {
}

/**
* create data from datasource
*
* @param spark
* @param configs
* @return DataFrame
*/
* create data from datasource
*
* @param spark
* @param configs
* @return DataFrame
*/
private[this] def createDataSource(spark: SparkSession,
configs: Configs,
partitionNum: String): DataFrame = {
Expand All @@ -117,6 +117,10 @@ object Main {
val reader = new NebulaReader(spark, configs, partitionNum)
reader.read()
}
case "nebula-ngql" => {
val reader = new NebulaReader(spark, configs, partitionNum)
reader.readByNqgl()
}
case "csv" => {
val reader = new CsvReader(spark, configs, partitionNum)
reader.read()
Expand All @@ -130,13 +134,13 @@ object Main {
}

/**
* execute algorithms
* @param spark
* @param algoName
* @param configs
* @param dataSet
* @return DataFrame
*/
* execute algorithms
* @param spark
* @param algoName
* @param configs
* @param dataSet
* @return DataFrame
*/
private[this] def executeAlgorithm(spark: SparkSession,
algoName: String,
configs: Configs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,47 @@ object NebulaConfigEntry {
if (!config.hasPath("nebula")) {
return NebulaConfigEntry(NebulaReadConfigEntry(), NebulaWriteConfigEntry())
}
val nebulaConfig = config.getConfig("nebula")
var nebulaConfig: Config = ConfigFactory.empty()
if(nebulaConfig.hasPath("nebula")){
nebulaConfig = config.getConfig("nebula")
}else if(nebulaConfig.hasPath("nebula-ngql")){
nebulaConfig = config.getConfig("nebula-ngql")
}


val readMetaAddress = nebulaConfig.getString("read.metaAddress")
val readSpace = nebulaConfig.getString("read.space")
val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList
val readSpace = nebulaConfig.getString("read.space")
val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList
val readWeightCols = if (nebulaConfig.hasPath("read.weightCols")) {
nebulaConfig.getStringList("read.weightCols").asScala.toList
} else {
List()
}
val readConfigEntry =
NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols)
var readConfigEntry: NebulaReadConfigEntry = NebulaReadConfigEntry()
if (nebulaConfig.hasPath("read.ngql")) {
val ngal = nebulaConfig.getString("read.ngql")
val graphAddress = nebulaConfig.getString("read.graphAddress")
readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols, ngal, graphAddress)
} else {
readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols)
}


val graphAddress = nebulaConfig.getString("write.graphAddress")
val graphAddress = nebulaConfig.getString("write.graphAddress")
val writeMetaAddress = nebulaConfig.getString("write.metaAddress")
val user = nebulaConfig.getString("write.user")
val pswd = nebulaConfig.getString("write.pswd")
val writeSpace = nebulaConfig.getString("write.space")
val writeTag = nebulaConfig.getString("write.tag")
val writeType = nebulaConfig.getString("write.type")
val user = nebulaConfig.getString("write.user")
val pswd = nebulaConfig.getString("write.pswd")
val writeSpace = nebulaConfig.getString("write.space")
val writeTag = nebulaConfig.getString("write.tag")
val writeType = nebulaConfig.getString("write.type")
val writeConfigEntry =
NebulaWriteConfigEntry(graphAddress,
writeMetaAddress,
user,
pswd,
writeSpace,
writeTag,
writeType)
writeMetaAddress,
user,
pswd,
writeSpace,
writeTag,
writeType)
NebulaConfigEntry(readConfigEntry, writeConfigEntry)
}
}
Expand Down Expand Up @@ -203,7 +216,9 @@ case class NebulaConfigEntry(readConfigEntry: NebulaReadConfigEntry,
case class NebulaReadConfigEntry(address: String = "",
space: String = "",
labels: List[String] = List(),
weightCols: List[String] = List()) {
weightCols: List[String] = List(),
graphAddress: String = "",
ngql: String = "") {
override def toString: String = {
s"NebulaReadConfigEntry: " +
s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " +
Expand Down
Loading

0 comments on commit 5016629

Please sign in to comment.