diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala index 8f0e070..118cad6 100644 --- a/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala @@ -12,12 +12,12 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} object ReadData { /** - * read edge data from local csv and apply clustering coefficient - * livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz - * - * The livejournal data is put to hdfs, the path is - * hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt - */ + * read edge data from local csv and apply clustering coefficient + * livejournal data: https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz + * + * The livejournal data is put to hdfs, the path is + * hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt + */ def readLiveJournalData(spark: SparkSession): DataFrame = { val df = spark.sparkContext.textFile( "hdfs://127.0.0.1:9000/user/root/livejournal/soc-LiveJournal1.txt") @@ -30,14 +30,14 @@ object ReadData { val schema = StructType( List(StructField("src", StringType, nullable = false), - StructField("dst", StringType, nullable = true))) + StructField("dst", StringType, nullable = true))) val edgeDF = spark.sqlContext.createDataFrame(dd, schema) edgeDF } /** - * read edge data from csv - */ + * read edge data from csv + */ def readCsvData(spark: SparkSession): DataFrame = { val df = spark.read .option("header", true) @@ -47,9 +47,9 @@ object ReadData { } /** - * read edge data from csv - * the data has string type id - */ + * read edge data from csv + * the data has string type id + */ def readStringCsvData(spark: SparkSession): DataFrame = { val df = spark.read .option("header", true) @@ -59,8 +59,8 @@ object ReadData { } /** - * read edge data from Nebula - */ + * read edge data from Nebula + */ def readNebulaData(spark: SparkSession): DataFrame = { val config = NebulaConnectionConfig @@ -80,4 +80,29 @@ object ReadData { val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() df } + + /** + * read edge data from Nebula by NGQL + */ + def readNebulaDataByNgql(spark: SparkSession): DataFrame = { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withTimeout(6000) + .withConenctionRetry(2) + .build() + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test") + .withLabel("knows") + .withNoColumn(true) + .withLimit(2000) + .withNgql(" GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships ;") + .build() + val df: DataFrame = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDfByNgql() + df + } + } diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala index a55a1e8..2fda550 100644 --- a/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala @@ -24,6 +24,7 @@ object TriangleCountExample { // val csvDF = ReadData.readCsvData(spark) // val nebulaDF = ReadData.readNebulaData(spark) + // val nebulaDFbyNgql = ReadData.readNebulaDataByNgql(spark) val journalDF = ReadData.readLiveJournalData(spark) graphTriangleCount(spark, journalDF) diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index a69010f..0221b84 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -10,7 +10,7 @@ } data: { - # data source. optional of nebula,csv,json + # data source. optional of nebula,nebula-ngql,csv,json source: csv # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text sink: csv @@ -50,6 +50,25 @@ } } + # Nebula Graph reader by ngql + nebula-ngql: { + # algo's data source from Nebula by ngql + read: { + # Nebula metad server address, multiple addresses are split by English comma,graphAddress must be set + metaAddress: "127.0.0.1:9559" + graphAddress: "127.0.0.1:9669" + # Nebula space + space: nb + # Nebula edge types, multiple labels means that data from multiple edges will union together + labels: ["serve"] + # Nebula edge property name for each edge type, this property will be as weight col for algorithm. + # Make sure the weightCols are corresponding to labels. + weightCols: ["start_year"] + # ngql for subgraph , return with edge + ngql: "GET SUBGRAPH with prop 1 STEPS FROM \"2\" YIELD EDGES AS relationships;" + } + } + local: { # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid. read:{ diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 171cdb6..9a5ec87 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -49,14 +49,14 @@ import org.apache.log4j.Logger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** - * This object is the entry of all graph algorithms. - * - * How to use this tool to run algorithm: - * 1. Configure application.conf file. - * 2. Make sure your environment has installed spark and started spark service. - * 3. Submit nebula algorithm application using this command: - * spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf - */ + * This object is the entry of all graph algorithms. + * + * How to use this tool to run algorithm: + * 1. Configure application.conf file. + * 2. Make sure your environment has installed spark and started spark service. + * 3. Submit nebula algorithm application using this command: + * spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf + */ object Main { private val LOGGER = Logger.getLogger(this.getClass) @@ -102,12 +102,12 @@ object Main { } /** - * create data from datasource - * - * @param spark - * @param configs - * @return DataFrame - */ + * create data from datasource + * + * @param spark + * @param configs + * @return DataFrame + */ private[this] def createDataSource(spark: SparkSession, configs: Configs, partitionNum: String): DataFrame = { @@ -117,6 +117,10 @@ object Main { val reader = new NebulaReader(spark, configs, partitionNum) reader.read() } + case "nebula-ngql" => { + val reader = new NebulaReader(spark, configs, partitionNum) + reader.readByNqgl() + } case "csv" => { val reader = new CsvReader(spark, configs, partitionNum) reader.read() @@ -130,13 +134,13 @@ object Main { } /** - * execute algorithms - * @param spark - * @param algoName - * @param configs - * @param dataSet - * @return DataFrame - */ + * execute algorithms + * @param spark + * @param algoName + * @param configs + * @param dataSet + * @return DataFrame + */ private[this] def executeAlgorithm(spark: SparkSession, algoName: String, configs: Configs, diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index a508ab6..49ffe88 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -80,34 +80,47 @@ object NebulaConfigEntry { if (!config.hasPath("nebula")) { return NebulaConfigEntry(NebulaReadConfigEntry(), NebulaWriteConfigEntry()) } - val nebulaConfig = config.getConfig("nebula") + var nebulaConfig: Config = ConfigFactory.empty() + if(nebulaConfig.hasPath("nebula")){ + nebulaConfig = config.getConfig("nebula") + }else if(nebulaConfig.hasPath("nebula-ngql")){ + nebulaConfig = config.getConfig("nebula-ngql") + } + val readMetaAddress = nebulaConfig.getString("read.metaAddress") - val readSpace = nebulaConfig.getString("read.space") - val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList + val readSpace = nebulaConfig.getString("read.space") + val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList val readWeightCols = if (nebulaConfig.hasPath("read.weightCols")) { nebulaConfig.getStringList("read.weightCols").asScala.toList } else { List() } - val readConfigEntry = - NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols) + var readConfigEntry: NebulaReadConfigEntry = NebulaReadConfigEntry() + if (nebulaConfig.hasPath("read.ngql")) { + val ngal = nebulaConfig.getString("read.ngql") + val graphAddress = nebulaConfig.getString("read.graphAddress") + readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols, ngal, graphAddress) + } else { + readConfigEntry = NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols) + } + - val graphAddress = nebulaConfig.getString("write.graphAddress") + val graphAddress = nebulaConfig.getString("write.graphAddress") val writeMetaAddress = nebulaConfig.getString("write.metaAddress") - val user = nebulaConfig.getString("write.user") - val pswd = nebulaConfig.getString("write.pswd") - val writeSpace = nebulaConfig.getString("write.space") - val writeTag = nebulaConfig.getString("write.tag") - val writeType = nebulaConfig.getString("write.type") + val user = nebulaConfig.getString("write.user") + val pswd = nebulaConfig.getString("write.pswd") + val writeSpace = nebulaConfig.getString("write.space") + val writeTag = nebulaConfig.getString("write.tag") + val writeType = nebulaConfig.getString("write.type") val writeConfigEntry = NebulaWriteConfigEntry(graphAddress, - writeMetaAddress, - user, - pswd, - writeSpace, - writeTag, - writeType) + writeMetaAddress, + user, + pswd, + writeSpace, + writeTag, + writeType) NebulaConfigEntry(readConfigEntry, writeConfigEntry) } } @@ -203,7 +216,9 @@ case class NebulaConfigEntry(readConfigEntry: NebulaReadConfigEntry, case class NebulaReadConfigEntry(address: String = "", space: String = "", labels: List[String] = List(), - weightCols: List[String] = List()) { + weightCols: List[String] = List(), + graphAddress: String = "", + ngql: String = "") { override def toString: String = { s"NebulaReadConfigEntry: " + s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " + diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index e478812..8f30770 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -17,14 +17,13 @@ abstract class DataReader(spark: SparkSession, configs: Configs) { } class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String) - extends DataReader(spark, configs) { + extends DataReader(spark, configs) { override def read(): DataFrame = { val metaAddress = configs.nebulaConfig.readConfigEntry.address - val space = configs.nebulaConfig.readConfigEntry.space - val labels = configs.nebulaConfig.readConfigEntry.labels - val weights = configs.nebulaConfig.readConfigEntry.weightCols - val partition = partitionNum.toInt - + val space = configs.nebulaConfig.readConfigEntry.space + val labels = configs.nebulaConfig.readConfigEntry.labels + val weights = configs.nebulaConfig.readConfigEntry.weightCols + val partition = partitionNum.toInt val config = NebulaConnectionConfig .builder() @@ -65,13 +64,63 @@ class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String) } dataset } + + def readByNqgl(): DataFrame = { + val metaAddress = configs.nebulaConfig.readConfigEntry.address + val graphAddress = configs.nebulaConfig.readConfigEntry.graphAddress + val space = configs.nebulaConfig.readConfigEntry.space + val labels = configs.nebulaConfig.readConfigEntry.labels + val weights = configs.nebulaConfig.readConfigEntry.weightCols + val ngql = configs.nebulaConfig.readConfigEntry.ngql + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress(metaAddress) + .withGraphAddress(graphAddress) + .withConenctionRetry(2) + .build() + + val noColumn = weights.isEmpty + + var dataset: DataFrame = null + for (i <- labels.indices) { + val returnCols: ListBuffer[String] = new ListBuffer[String] + if (configs.dataSourceSinkEntry.hasWeight && weights.nonEmpty) { + returnCols.append(weights(i)) + } + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace(space) + .withLabel(labels(i)) + .withNoColumn(noColumn) + .withReturnCols(returnCols.toList) + .withNgql(ngql) + .build() + if (dataset == null) { + dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDfByNgql() + if (weights.nonEmpty) { + dataset = dataset.select("_srcId", "_dstId", weights(i)) + } + } else { + var df = spark.read + .nebula(config, nebulaReadEdgeConfig) + .loadEdgesToDF() + if (weights.nonEmpty) { + df = df.select("_srcId", "_dstId", weights(i)) + } + dataset = dataset.union(df) + } + } + dataset + } } class CsvReader(spark: SparkSession, configs: Configs, partitionNum: String) - extends DataReader(spark, configs) { + extends DataReader(spark, configs) { override def read(): DataFrame = { val delimiter = configs.localConfigEntry.delimiter - val header = configs.localConfigEntry.header + val header = configs.localConfigEntry.header val localPath = configs.localConfigEntry.filePath val partition = partitionNum.toInt @@ -82,8 +131,8 @@ class CsvReader(spark: SparkSession, configs: Configs, partitionNum: String) .option("delimiter", delimiter) .csv(localPath) val weight = configs.localConfigEntry.weight - val src = configs.localConfigEntry.srcId - val dst = configs.localConfigEntry.dstId + val src = configs.localConfigEntry.srcId + val dst = configs.localConfigEntry.dstId if (configs.dataSourceSinkEntry.hasWeight && weight != null && !weight.trim.isEmpty) { data.select(src, dst, weight) } else { @@ -97,15 +146,15 @@ class CsvReader(spark: SparkSession, configs: Configs, partitionNum: String) } class JsonReader(spark: SparkSession, configs: Configs, partitionNum: String) - extends DataReader(spark, configs) { + extends DataReader(spark, configs) { override def read(): DataFrame = { val localPath = configs.localConfigEntry.filePath - val data = spark.read.json(localPath) + val data = spark.read.json(localPath) val partition = partitionNum.toInt val weight = configs.localConfigEntry.weight - val src = configs.localConfigEntry.srcId - val dst = configs.localConfigEntry.dstId + val src = configs.localConfigEntry.srcId + val dst = configs.localConfigEntry.dstId if (configs.dataSourceSinkEntry.hasWeight && weight != null && !weight.trim.isEmpty) { data.select(src, dst, weight) } else {