Skip to content

Commit

Permalink
support delete related edges when delete vertex (#53)
Browse files Browse the repository at this point in the history
* support delete related edges when delete vertex

* add test
  • Loading branch information
Nicole00 authored Aug 17, 2022
1 parent afdb61d commit f96b44f
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,14 @@ class WriteNebulaVertexConfig(space: String,
vidAsProp: Boolean,
user: String,
passwd: String,
writeMode: String)
writeMode: String,
deleteEdge: Boolean)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode) {
def getTagName = tagName
def getVidField = vidField
def getVidPolicy = if (vidPolicy == null) "" else vidPolicy
def getVidAsProp = vidAsProp
def getTagName = tagName
def getVidField = vidField
def getVidPolicy = if (vidPolicy == null) "" else vidPolicy
def getVidAsProp = vidAsProp
def getDeleteEdge = deleteEdge
}

/**
Expand All @@ -270,6 +272,9 @@ object WriteNebulaVertexConfig {
/** whether set vid as property */
var vidAsProp: Boolean = false

/** whether delete the related edges of vertex */
var deleteEdge: Boolean = false

/**
* set space name
*/
Expand Down Expand Up @@ -343,6 +348,14 @@ object WriteNebulaVertexConfig {
this
}

/**
* set whether delete related edges when delete vertex
*/
def withDeleteEdge(deleteEdge: Boolean): WriteVertexConfigBuilder = {
this.deleteEdge = deleteEdge
this
}

/**
* check and get WriteNebulaVertexConfig
*/
Expand All @@ -356,7 +369,8 @@ object WriteNebulaVertexConfig {
vidAsProp,
user,
passwd,
writeMode)
writeMode,
deleteEdge)
}

private def check(): Unit = {
Expand Down Expand Up @@ -388,7 +402,7 @@ object WriteNebulaVertexConfig {
}
LOG.info(
s"NebulaWriteVertexConfig={space=$space,tagName=$tagName,vidField=$vidField," +
s"vidPolicy=$vidPolicy,batch=$batch,writeMode=$writeMode}")
s"vidPolicy=$vidPolicy,batch=$batch,writeMode=$writeMode,deleteEdge=$deleteEdge}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
var dstAsProp: Boolean = _
var rankAsProp: Boolean = _
var writeMode: WriteMode.Value = _
var deleteEdge: Boolean = _

if (operaType == OperaType.WRITE) {
require(parameters.isDefinedAt(GRAPH_ADDRESS),
Expand Down Expand Up @@ -166,6 +167,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
rankAsProp = parameters.getOrElse(RANK_AS_PROP, false).toString.toBoolean
writeMode =
WriteMode.withName(parameters.getOrElse(WRITE_MODE, DEFAULT_WRITE_MODE).toString.toLowerCase)
deleteEdge = parameters.getOrElse(DELETE_EDGE, false).toString.toBoolean
}

def getReturnCols: List[String] = {
Expand Down Expand Up @@ -249,6 +251,7 @@ object NebulaOptions {
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"

val DEFAULT_TIMEOUT: Int = 3000
val DEFAULT_CONNECTION_TIMEOUT: Int = 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object NebulaTemplate {
private[connector] val UPDATE_EDGE_TEMPLATE = "UPDATE %s ON `%s` %s->%s@%d SET %s"
private[connector] val UPDATE_VALUE_TEMPLATE = "`%s`=%s"

private[connector] val DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"
private[connector] val DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"
private[connector] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"
private[connector] val DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"
private[connector] val DELETE_VERTEX_WITH_EDGE_TEMPLATE = "DELETE VERTEX %s WITH EDGE"
private[connector] val DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"
private[connector] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ package object connector {
.option(NebulaOptions.BATCH, writeConfig.getBatch)
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.vesoft.nebula.connector.NebulaTemplate.{
BATCH_INSERT_TEMPLATE,
DELETE_EDGE_TEMPLATE,
DELETE_VERTEX_TEMPLATE,
DELETE_VERTEX_WITH_EDGE_TEMPLATE,
EDGE_ENDPOINT_TEMPLATE,
EDGE_VALUE_TEMPLATE,
EDGE_VALUE_WITHOUT_RANKING_TEMPLATE,
Expand Down Expand Up @@ -365,26 +366,31 @@ object NebulaExecutor {
/**
* construct delete statement for vertex
*/
def toDeleteExecuteStatement(vertices: NebulaVertices): String = {
DELETE_VERTEX_TEMPLATE.format(
vertices.values
.map { value =>
vertices.policy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.vertexIDSlice)
def toDeleteExecuteStatement(vertices: NebulaVertices, deleteEdge: Boolean): String = {
if (deleteEdge)
DELETE_VERTEX_WITH_EDGE_TEMPLATE.format(genDeleteVertexInfo(vertices))
else
DELETE_VERTEX_TEMPLATE.format(genDeleteVertexInfo(vertices))
}

case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.vertexIDSlice)
private def genDeleteVertexInfo(vertices: NebulaVertices): String = {
vertices.values
.map { value =>
vertices.policy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.vertexIDSlice)

case None =>
value.vertexIDSlice
case _ =>
throw new IllegalArgumentException(
s"vertex policy ${vertices.policy.get} is not supported")
}
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.vertexIDSlice)

case None =>
value.vertexIDSlice
case _ =>
throw new IllegalArgumentException(
s"vertex policy ${vertices.policy.get} is not supported")
}
.mkString(",")
)
}
.mkString(",")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaVertices)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaVertices)
case WriteMode.DELETE => NebulaExecutor.toDeleteExecuteStatement(nebulaVertices)
case WriteMode.DELETE =>
NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, nebulaOptions.deleteEdge)
case _ =>
throw new IllegalArgumentException(s"write mode ${nebulaOptions.writeMode} not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,44 @@ object SparkMock {
spark.stop()
}

/**
* write nebula vertex with delete_with_edge mode
*/
def deleteVertexWithEdge(): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()

val df = spark.read
.option("header", true)
.csv("src/test/resources/vertex.csv")

val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test_write_string")
.withTag("person_connector")
.withVidField("id")
.withVidAsProp(false)
.withWriteMode(WriteMode.DELETE)
.withDeleteEdge(true)
.withBatch(5)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()

spark.stop()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,13 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
vertices.append(NebulaVertex("\"vid2\"", List()))

val nebulaVertices = NebulaVertices(List(), vertices.toList, None)
val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices)
val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, false)
val expectVertexDeleteStatement = "DELETE VERTEX \"vid1\",\"vid2\""
assert(expectVertexDeleteStatement.equals(vertexStatement))

val vertexWithEdgeStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, true)
val expectVertexWithEdgeDeleteStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE"
assert(expectVertexWithEdgeDeleteStatement.equals(vertexWithEdgeStatement))
}

test("test toDeleteExecuteStatement for vertex with HASH policy") {
Expand All @@ -346,9 +350,14 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
vertices.append(NebulaVertex("vid2", List()))

val nebulaVertices = NebulaVertices(List(), vertices.toList, Some(KeyPolicy.HASH))
val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices)
val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, false)
val expectVertexDeleteStatement = "DELETE VERTEX hash(\"vid1\"),hash(\"vid2\")"
assert(expectVertexDeleteStatement.equals(vertexStatement))

val vertexWithEdgeStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, true)
val expectVertexWithEdgeDeleteStatement =
"DELETE VERTEX hash(\"vid1\"),hash(\"vid2\") WITH EDGE"
assert(expectVertexWithEdgeDeleteStatement.equals(vertexWithEdgeStatement))
}

test("test toDeleteExecuteStatement for edge") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll {
graphMock.mockIntIdGraphSchema()
graphMock.close()
SparkMock.writeVertex()
SparkMock.writeEdge()
}

test("write vertex into test_write_string space with delete mode") {
Expand All @@ -36,6 +37,28 @@ class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll {
assert(resultSet.isEmpty)
}

test("write vertex into test_write_with_edge_string space with delete with edge mode") {
SparkMock.writeVertex()
SparkMock.writeEdge()
SparkMock.deleteVertexWithEdge()
val addresses: List[Address] = List(new Address("127.0.0.1", 9669))
val graphProvider = new GraphProvider(addresses, 3000)

graphProvider.switchSpace("root", "nebula", "test_write_string")
// assert vertex is deleted
val vertexResultSet: ResultSet =
graphProvider.submit("use test_write_string;match (v:person_connector) return v;")
assert(vertexResultSet.getColumnNames.size() == 0)
assert(vertexResultSet.isEmpty)

// assert edge is deleted
val edgeResultSet: ResultSet =
graphProvider.submit("use test_write_string;fetch prop on friend_connector 1->2@10")
assert(edgeResultSet.getColumnNames.size() == 0)
assert(edgeResultSet.isEmpty)

}

test("write edge into test_write_string space with delete mode") {
SparkMock.deleteEdge()
val addresses: List[Address] = List(new Address("127.0.0.1", 9669))
Expand Down

0 comments on commit f96b44f

Please sign in to comment.