Skip to content

Commit

Permalink
support read edge data through ngql (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Nov 5, 2024
1 parent a623064 commit 8bdce4e
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,55 @@ package object connector {
.rdd
}


/**
* Reading edges from Nebula Graph by ngql
*
* @return DataFrame
*/
def loadEdgesToDfByNgql(): DataFrame = {
assert(connectionConfig != null && readConfig != null,
"nebula config is not set, please call nebula() before loadEdgesToDfByNgql")

val dfReader = reader
.format(classOf[NebulaDataSource].getName)
.option(NebulaOptions.TYPE, DataTypeEnum.EDGE.toString)
.option(NebulaOptions.OPERATE_TYPE, OperaType.READ.toString)
.option(NebulaOptions.SPACE_NAME, readConfig.getSpace)
.option(NebulaOptions.LABEL, readConfig.getLabel)
.option(NebulaOptions.RETURN_COLS, readConfig.getReturnCols.mkString(","))
.option(NebulaOptions.NO_COLUMN, readConfig.getNoColumn)
.option(NebulaOptions.LIMIT, readConfig.getLimit)
.option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum)
.option(NebulaOptions.NGQL, readConfig.getNgql)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
.option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)
.option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL)
.option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL)

if (readConfig.getUser != null && readConfig.getPasswd != null) {
dfReader
.option(NebulaOptions.USER_NAME, readConfig.getUser)
.option(NebulaOptions.PASSWD, readConfig.getPasswd)
}

if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) {
dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType)
SSLSignType.withName(connectionConfig.getSignType) match {
case SSLSignType.CA =>
dfReader.option(NebulaOptions.CA_SIGN_PARAM, connectionConfig.getCaSignParam)
case SSLSignType.SELF =>
dfReader.option(NebulaOptions.SELF_SIGN_PARAM, connectionConfig.getSelfSignParam)
}
}

dfReader.load()
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.connector.reader

import com.vesoft.nebula.Value
import com.vesoft.nebula.client.graph.data.{Relationship, ResultSet, ValueWrapper}
import com.vesoft.nebula.connector.NebulaUtils.NebulaValueGetter
import com.vesoft.nebula.connector.nebula.GraphProvider
import com.vesoft.nebula.connector.{NebulaOptions, NebulaUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters.asScalaBufferConverter

/**
* create reader by ngql
*/
class NebulaNgqlEdgePartitionReader extends PartitionReader[InternalRow] {

private val LOG: Logger = LoggerFactory.getLogger(this.getClass)

private var nebulaOptions: NebulaOptions = _
private var graphProvider: GraphProvider = _
private var schema: StructType = _
private var resultSet: ResultSet = _
private var edgeIterator: Iterator[ListBuffer[ValueWrapper]] = _

def this(nebulaOptions: NebulaOptions, schema: StructType) {
this()
this.schema = schema
this.nebulaOptions = nebulaOptions
this.graphProvider = new GraphProvider(
nebulaOptions.getGraphAddress,
nebulaOptions.user,
nebulaOptions.passwd,
nebulaOptions.timeout,
nebulaOptions.enableGraphSSL,
nebulaOptions.sslSignType,
nebulaOptions.caSignParam,
nebulaOptions.selfSignParam
)
// add exception when session build failed
graphProvider.switchSpace(nebulaOptions.spaceName)
resultSet = graphProvider.submit(nebulaOptions.ngql)
edgeIterator = query()
}

def query(): Iterator[ListBuffer[ValueWrapper]] = {
val edges: ListBuffer[ListBuffer[ValueWrapper]] = new ListBuffer[ListBuffer[ValueWrapper]]
val properties = nebulaOptions.getReturnCols
for (i <- 0 until resultSet.rowsSize()) {
val rowValues = resultSet.rowValues(i).values()
for (j <- 0 until rowValues.size()) {
val value = rowValues.get(j)
val valueType = value.getValue.getSetField
if (valueType == Value.EVAL) {
val relationship = value.asRelationship()
if (checkLabel(relationship)) {
edges.append(convertToEdge(relationship, properties))
}
} else if (valueType == Value.LVAL) {
val list: mutable.Buffer[ValueWrapper] = value.asList().asScala
edges.appendAll(
list.toStream
.filter(e => e != null && e.isEdge() && checkLabel(e.asRelationship()))
.map(e => convertToEdge(e.asRelationship(), properties))
)
} else if (valueType == Value.PVAL){
val list: java.util.List[Relationship] = value.asPath().getRelationships()
edges.appendAll(
list.toStream
.filter(e => checkLabel(e))
.map(e => convertToEdge(e, properties))
)
} else if (valueType != Value.NVAL && valueType != 0) {
LOG.error(s"Unexpected edge type encountered: ${valueType}. Only edge or path should be returned.")
throw new RuntimeException("Invalid nGQL return type. Value type conversion failed.");
}
}
}
edges.iterator
}

def checkLabel(relationship: Relationship): Boolean = {
this.nebulaOptions.label.equals(relationship.edgeName())
}

def convertToEdge(relationship: Relationship,
properties: List[String]): ListBuffer[ValueWrapper] = {
val edge: ListBuffer[ValueWrapper] = new ListBuffer[ValueWrapper]
edge.append(relationship.srcId())
edge.append(relationship.dstId())
edge.append(new ValueWrapper(new Value(Value.IVAL, relationship.ranking()), "utf-8"))
if (properties == null || properties.isEmpty)
return edge
else {
for (i <- properties.indices) {
edge.append(relationship.properties().get(properties(i)))
}
}
edge
}

override def next(): Boolean = {
edgeIterator.hasNext
}

override def get(): InternalRow = {
val getters: Array[NebulaValueGetter] = NebulaUtils.makeGetters(schema)
val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))

val edge = edgeIterator.next();
for (i <- getters.indices) {
val value: ValueWrapper = edge(i)
var resolved = false
if (value.isNull) {
mutableRow.setNullAt(i)
resolved = true
}
if (value.isString) {
getters(i).apply(value.asString(), mutableRow, i)
resolved = true
}
if (value.isDate) {
getters(i).apply(value.asDate(), mutableRow, i)
resolved = true
}
if (value.isTime) {
getters(i).apply(value.asTime(), mutableRow, i)
resolved = true
}
if (value.isDateTime) {
getters(i).apply(value.asDateTime(), mutableRow, i)
resolved = true
}
if (value.isLong) {
getters(i).apply(value.asLong(), mutableRow, i)
}
if (value.isBoolean) {
getters(i).apply(value.asBoolean(), mutableRow, i)
}
if (value.isDouble) {
getters(i).apply(value.asDouble(), mutableRow, i)
}
if (value.isGeography) {
getters(i).apply(value.asGeography(), mutableRow, i)
}
if (value.isDuration) {
getters(i).apply(value.asDuration(), mutableRow, i)
}
}
mutableRow

}

override def close(): Unit = {
graphProvider.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import org.apache.spark.sql.types.StructType

class NebulaPartitionReaderFactory(private val nebulaOptions: NebulaOptions,
private val schema: StructType)
extends PartitionReaderFactory {
extends PartitionReaderFactory {
override def createReader(inputPartition: InputPartition): PartitionReader[InternalRow] = {
val partition = inputPartition.asInstanceOf[NebulaPartition].partition
if (DataTypeEnum.VERTEX.toString.equals(nebulaOptions.dataType)) {

new NebulaVertexPartitionReader(partition, nebulaOptions, schema)
} else {
} else if (DataTypeEnum.EDGE.toString.equals(nebulaOptions.dataType)) {
new NebulaEdgePartitionReader(partition, nebulaOptions, schema)
} else {
new NebulaNgqlEdgePartitionReader(nebulaOptions, schema)
}
}
}

0 comments on commit 8bdce4e

Please sign in to comment.