Skip to content

Commit

Permalink
add code comment for connector (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Jan 18, 2022
1 parent 54bce41 commit ecdf065
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ object NebulaConnectionConfig {
protected var caSignParam: CASSLSignParams = null
protected var selfSignParam: SelfSSLSignParams = null

/**
* set nebula meta server address, multi addresses is split by English comma
*/
def withMetaAddress(metaAddress: String): ConfigBuilder = {
this.metaAddress = metaAddress
this
}

/**
* set nebula graph server address, multi addresses is split by English comma
*/
def withGraphAddress(graphAddress: String): ConfigBuilder = {
this.graphAddress = graphAddress
this
Expand Down Expand Up @@ -264,16 +270,25 @@ object WriteNebulaVertexConfig {
/** whether set vid as property */
var vidAsProp: Boolean = false

/**
* set space name
*/
def withSpace(space: String): WriteVertexConfigBuilder = {
this.space = space
this
}

/**
* set tag name
*/
def withTag(tagName: String): WriteVertexConfigBuilder = {
this.tagName = tagName
this
}

/**
* set which field in dataframe as nebula tag's id
*/
def withVidField(vidField: String): WriteVertexConfigBuilder = {
this.vidField = vidField
this
Expand Down Expand Up @@ -304,21 +319,33 @@ object WriteNebulaVertexConfig {
this
}

/**
* set user name for nebula graph
*/
def withUser(user: String): WriteVertexConfigBuilder = {
this.user = user
this
}

/**
* set password for nebula graph's user
*/
def withPasswd(passwd: String): WriteVertexConfigBuilder = {
this.passwd = passwd
this
}

/**
* set nebula write mode for nebula tag, INSERT or UPDATE
*/
def withWriteMode(writeMode: WriteMode.Value): WriteVertexConfigBuilder = {
this.writeMode = writeMode.toString
this
}

/**
* check and get WriteNebulaVertexConfig
*/
def build(): WriteNebulaVertexConfig = {
check()
new WriteNebulaVertexConfig(space,
Expand Down Expand Up @@ -417,6 +444,9 @@ object WriteNebulaEdgeConfig {

private val LOG: Logger = LoggerFactory.getLogger(WriteNebulaEdgeConfig.getClass)

/**
* a builder to create {@link WriteNebulaEdgeConfig}
*/
class WriteEdgeConfigBuilder {

var space: String = _
Expand All @@ -440,27 +470,37 @@ object WriteNebulaEdgeConfig {
/** whether set rank as property */
var rankAsProp: Boolean = false

/** write mode for nebula, insert or update */
var writeMode: String = WriteMode.INSERT.toString

/**
* set space name
*/
def withSpace(space: String): WriteEdgeConfigBuilder = {
this.space = space
this
}

/**
* set edge type name
*/
def withEdge(edgeName: String): WriteEdgeConfigBuilder = {
this.edgeName = edgeName
this
}

/**
* set rank field in dataframe
* it rankField is not set, then edge has no rank value
* it rankField is not set, then edge has default 0 rank value
* */
def withRankField(rankField: String): WriteEdgeConfigBuilder = {
this.rankField = rankField
this
}

/**
* set which field in dataframe as nebula edge's src id
*/
def withSrcIdField(srcIdField: String): WriteEdgeConfigBuilder = {
this.srcIdField = srcIdField
this
Expand All @@ -474,6 +514,9 @@ object WriteNebulaEdgeConfig {
this
}

/**
* set which field in dataframe as nebula edge's dst id
*/
def withDstIdField(dstIdField: String): WriteEdgeConfigBuilder = {
this.dstIdField = dstIdField
this
Expand Down Expand Up @@ -519,21 +562,33 @@ object WriteNebulaEdgeConfig {
this
}

/**
* set user name for nebula graph
*/
def withUser(user: String): WriteEdgeConfigBuilder = {
this.user = user
this
}

/**
* set password for nebula graph's user
*/
def withPasswd(passwd: String): WriteEdgeConfigBuilder = {
this.passwd = passwd
this
}

/**
* set write mode for nebula edge, INSERT or UPDATE
*/
def withWriteMode(writeMode: WriteMode.Value): WriteEdgeConfigBuilder = {
this.writeMode = writeMode.toString
this
}

/**
* check configs and get WriteNebulaEdgeConfig
*/
def build(): WriteNebulaEdgeConfig = {
check()
new WriteNebulaEdgeConfig(space,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ package com.vesoft.nebula.connector
import scala.collection.mutable.ListBuffer

object PartitionUtils {

/**
* compute each spark partition should assign how many nebula parts
*
* @param index spark partition index
* @param nebulaTotalPart nebula space partition number
* @param sparkPartitionNum spark total partition number
* @return the list of nebula partitions assign to spark index partition
*/
def getScanParts(index: Int, nebulaTotalPart: Int, sparkPartitionNum: Int): List[Integer] = {
val scanParts = new ListBuffer[Integer]
var currentPart = index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class GraphProvider(addresses: List[Address],

var session: Session = null

/**
* release session
*/
def releaseGraphClient(session: Session): Unit = {
session.release()
}
Expand All @@ -72,6 +75,14 @@ class GraphProvider(addresses: List[Address],
pool.close()
}

/**
* switch space
*
* @param user
* @param password
* @param space
* @return if execute succeed
*/
def switchSpace(user: String, password: String, space: String): Boolean = {
if (session == null) {
session = pool.getSession(user, password, true)
Expand All @@ -82,6 +93,12 @@ class GraphProvider(addresses: List[Address],
result.isSucceeded
}

/**
* execute the statement
*
* @param statement insert tag/edge statement
* @return execute result
*/
def submit(statement: String): ResultSet = {
if (session == null) {
LOG.error("graph session is null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ class MetaProvider(addresses: List[Address],
}
client.connect()

/**
* get the partition num of nebula space
*/
def getPartitionNumber(space: String): Int = {
client.getPartsAlloc(space).size()
}

/**
* get the vid type of nebula space
*/
def getVidType(space: String): VidType.Value = {
val vidType = client.getSpace(space).getProperties.getVid_type.getType
if (vidType == PropertyType.FIXED_STRING) {
Expand All @@ -64,14 +70,35 @@ class MetaProvider(addresses: List[Address],
VidType.INT
}

/**
* get {@link Schema} of nebula tag
*
* @param space
* @param tag
* @return schema
*/
def getTag(space: String, tag: String): Schema = {
client.getTag(space, tag)
}

/**
* get {@link Schema} of nebula edge type
*
* @param space
* @param edge
* @return schema
*/
def getEdge(space: String, edge: String): Schema = {
client.getEdge(space, edge)
}

/**
* get tag's schema info
*
* @param space
* @param tag
* @return Map, property name -> data type {@link PropertyType}
*/
def getTagSchema(space: String, tag: String): Map[String, Integer] = {
val tagSchema = client.getTag(space, tag)
val schema = new mutable.HashMap[String, Integer]
Expand All @@ -83,6 +110,13 @@ class MetaProvider(addresses: List[Address],
schema.toMap
}

/**
* get edge's schema info
*
* @param space
* @param edge
* @return Map, property name -> data type {@link PropertyType}
*/
def getEdgeSchema(space: String, edge: String): Map[String, Integer] = {
val edgeSchema = client.getEdge(space, edge)
val schema = new mutable.HashMap[String, Integer]
Expand All @@ -94,6 +128,9 @@ class MetaProvider(addresses: List[Address],
schema.toMap
}

/**
* check if a label is Tag or Edge
*/
def getLabelType(space: String, label: String): DataTypeEnum.Value = {
val tags = client.getTags(space)
for (tag <- tags.asScala) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
}
}

/**
* submit buffer edges to nebula
*/
def execute(): Unit = {
val nebulaEdges = NebulaEdges(propNames, edges.toList, srcPolicy, dstPolicy)
val exec = nebulaOptions.writeMode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
}
}

/**
* submit buffer vertices to nebula
*/
def execute(): Unit = {
val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
val exec = nebulaOptions.writeMode match {
Expand Down

0 comments on commit ecdf065

Please sign in to comment.