Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add code comment for connector #32

Merged
merged 1 commit into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ object NebulaConnectionConfig {
protected var caSignParam: CASSLSignParams = null
protected var selfSignParam: SelfSSLSignParams = null

/**
* set nebula meta server address, multi addresses is split by English comma
*/
def withMetaAddress(metaAddress: String): ConfigBuilder = {
this.metaAddress = metaAddress
this
}

/**
* set nebula graph server address, multi addresses is split by English comma
*/
def withGraphAddress(graphAddress: String): ConfigBuilder = {
this.graphAddress = graphAddress
this
Expand Down Expand Up @@ -264,16 +270,25 @@ object WriteNebulaVertexConfig {
/** whether set vid as property */
var vidAsProp: Boolean = false

/**
* set space name
*/
def withSpace(space: String): WriteVertexConfigBuilder = {
this.space = space
this
}

/**
* set tag name
*/
def withTag(tagName: String): WriteVertexConfigBuilder = {
this.tagName = tagName
this
}

/**
* set which field in dataframe as nebula tag's id
*/
def withVidField(vidField: String): WriteVertexConfigBuilder = {
this.vidField = vidField
this
Expand Down Expand Up @@ -304,21 +319,33 @@ object WriteNebulaVertexConfig {
this
}

/**
* set user name for nebula graph
*/
def withUser(user: String): WriteVertexConfigBuilder = {
this.user = user
this
}

/**
* set password for nebula graph's user
*/
def withPasswd(passwd: String): WriteVertexConfigBuilder = {
this.passwd = passwd
this
}

/**
* set nebula write mode for nebula tag, INSERT or UPDATE
*/
def withWriteMode(writeMode: WriteMode.Value): WriteVertexConfigBuilder = {
this.writeMode = writeMode.toString
this
}

/**
* check and get WriteNebulaVertexConfig
*/
def build(): WriteNebulaVertexConfig = {
check()
new WriteNebulaVertexConfig(space,
Expand Down Expand Up @@ -417,6 +444,9 @@ object WriteNebulaEdgeConfig {

private val LOG: Logger = LoggerFactory.getLogger(WriteNebulaEdgeConfig.getClass)

/**
* a builder to create {@link WriteNebulaEdgeConfig}
*/
class WriteEdgeConfigBuilder {

var space: String = _
Expand All @@ -440,27 +470,37 @@ object WriteNebulaEdgeConfig {
/** whether set rank as property */
var rankAsProp: Boolean = false

/** write mode for nebula, insert or update */
var writeMode: String = WriteMode.INSERT.toString

/**
* set space name
*/
def withSpace(space: String): WriteEdgeConfigBuilder = {
this.space = space
this
}

/**
* set edge type name
*/
def withEdge(edgeName: String): WriteEdgeConfigBuilder = {
this.edgeName = edgeName
this
}

/**
* set rank field in dataframe
* it rankField is not set, then edge has no rank value
* it rankField is not set, then edge has default 0 rank value
* */
def withRankField(rankField: String): WriteEdgeConfigBuilder = {
this.rankField = rankField
this
}

/**
* set which field in dataframe as nebula edge's src id
*/
def withSrcIdField(srcIdField: String): WriteEdgeConfigBuilder = {
this.srcIdField = srcIdField
this
Expand All @@ -474,6 +514,9 @@ object WriteNebulaEdgeConfig {
this
}

/**
* set which field in dataframe as nebula edge's dst id
*/
def withDstIdField(dstIdField: String): WriteEdgeConfigBuilder = {
this.dstIdField = dstIdField
this
Expand Down Expand Up @@ -519,21 +562,33 @@ object WriteNebulaEdgeConfig {
this
}

/**
* set user name for nebula graph
*/
def withUser(user: String): WriteEdgeConfigBuilder = {
this.user = user
this
}

/**
* set password for nebula graph's user
*/
def withPasswd(passwd: String): WriteEdgeConfigBuilder = {
this.passwd = passwd
this
}

/**
* set write mode for nebula edge, INSERT or UPDATE
*/
def withWriteMode(writeMode: WriteMode.Value): WriteEdgeConfigBuilder = {
this.writeMode = writeMode.toString
this
}

/**
* check configs and get WriteNebulaEdgeConfig
*/
def build(): WriteNebulaEdgeConfig = {
check()
new WriteNebulaEdgeConfig(space,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ package com.vesoft.nebula.connector
import scala.collection.mutable.ListBuffer

object PartitionUtils {

/**
* compute each spark partition should assign how many nebula parts
*
* @param index spark partition index
* @param nebulaTotalPart nebula space partition number
* @param sparkPartitionNum spark total partition number
* @return the list of nebula partitions assign to spark index partition
*/
def getScanParts(index: Int, nebulaTotalPart: Int, sparkPartitionNum: Int): List[Integer] = {
val scanParts = new ListBuffer[Integer]
var currentPart = index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class GraphProvider(addresses: List[Address],

var session: Session = null

/**
* release session
*/
def releaseGraphClient(session: Session): Unit = {
session.release()
}
Expand All @@ -72,6 +75,14 @@ class GraphProvider(addresses: List[Address],
pool.close()
}

/**
* switch space
*
* @param user
* @param password
* @param space
* @return if execute succeed
*/
def switchSpace(user: String, password: String, space: String): Boolean = {
if (session == null) {
session = pool.getSession(user, password, true)
Expand All @@ -82,6 +93,12 @@ class GraphProvider(addresses: List[Address],
result.isSucceeded
}

/**
* execute the statement
*
* @param statement insert tag/edge statement
* @return execute result
*/
def submit(statement: String): ResultSet = {
if (session == null) {
LOG.error("graph session is null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ class MetaProvider(addresses: List[Address],
}
client.connect()

/**
* get the partition num of nebula space
*/
def getPartitionNumber(space: String): Int = {
client.getPartsAlloc(space).size()
}

/**
* get the vid type of nebula space
*/
def getVidType(space: String): VidType.Value = {
val vidType = client.getSpace(space).getProperties.getVid_type.getType
if (vidType == PropertyType.FIXED_STRING) {
Expand All @@ -64,14 +70,35 @@ class MetaProvider(addresses: List[Address],
VidType.INT
}

/**
* get {@link Schema} of nebula tag
*
* @param space
* @param tag
* @return schema
*/
def getTag(space: String, tag: String): Schema = {
client.getTag(space, tag)
}

/**
* get {@link Schema} of nebula edge type
*
* @param space
* @param edge
* @return schema
*/
def getEdge(space: String, edge: String): Schema = {
client.getEdge(space, edge)
}

/**
* get tag's schema info
*
* @param space
* @param tag
* @return Map, property name -> data type {@link PropertyType}
*/
def getTagSchema(space: String, tag: String): Map[String, Integer] = {
val tagSchema = client.getTag(space, tag)
val schema = new mutable.HashMap[String, Integer]
Expand All @@ -83,6 +110,13 @@ class MetaProvider(addresses: List[Address],
schema.toMap
}

/**
* get edge's schema info
*
* @param space
* @param edge
* @return Map, property name -> data type {@link PropertyType}
*/
def getEdgeSchema(space: String, edge: String): Map[String, Integer] = {
val edgeSchema = client.getEdge(space, edge)
val schema = new mutable.HashMap[String, Integer]
Expand All @@ -94,6 +128,9 @@ class MetaProvider(addresses: List[Address],
schema.toMap
}

/**
* check if a label is Tag or Edge
*/
def getLabelType(space: String, label: String): DataTypeEnum.Value = {
val tags = client.getTags(space)
for (tag <- tags.asScala) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
}
}

/**
* submit buffer edges to nebula
*/
def execute(): Unit = {
val nebulaEdges = NebulaEdges(propNames, edges.toList, srcPolicy, dstPolicy)
val exec = nebulaOptions.writeMode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
}
}

/**
* submit buffer vertices to nebula
*/
def execute(): Unit = {
val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
val exec = nebulaOptions.writeMode match {
Expand Down