diff --git a/README.md b/README.md index b780e051..ea313424 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ only showing top 2 rows ### Write in PySpark Let's try a write example, by default, the `writeMode` is `insert` - +#### write vertex ```python df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "type", "vertex").option( @@ -190,7 +190,7 @@ df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "passwd", "nebula").option( "user", "root").save() ``` - +#### delete vertex For delete or update write mode, we could(for instance)specify with `writeMode` as `delete` like: ```python df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( @@ -206,6 +206,44 @@ df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "writeMode", "delete").option( "user", "root").save() ``` +#### write edge +```python +df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\ + .mode("overwrite")\ + .option("srcPolicy", "")\ + .option("dstPolicy", "")\ + .option("metaAddress", "metad0:9559")\ + .option("graphAddress", "graphd:9669")\ + .option("user", "root")\ + .option("passwd", "nebula")\ + .option("type", "edge")\ + .option("spaceName", "basketballplayer")\ + .option("label", "server")\ + .option("srcVertexField", "srcid")\ + .option("dstVertexField", "dstid")\ + .option("randkField", "")\ + .option("batch", 100)\ + .option("writeMode", "insert").save() # delete to delete edge, update to update edge +``` +#### delete edge +```python +df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\ + .mode("overwrite")\ + .option("srcPolicy", "")\ + .option("dstPolicy", "")\ + .option("metaAddress", "metad0:9559")\ + .option("graphAddress", "graphd:9669")\ + .option("user", "root")\ + .option("passwd", "nebula")\ + .option("type", "edge")\ + .option("spaceName", "basketballplayer")\ + .option("label", "server")\ + .option("srcVertexField", "srcid")\ + .option("dstVertexField", "dstid")\ + .option("randkField", "")\ + .option("batch", 100)\ + .option("writeMode", "delete").save() # delete to delete edge, update to update edge +``` ### Options in PySpark @@ -221,7 +259,7 @@ For more options, i.e. delete edge with vertex being deleted, refer to [nebula/c val VERTEX_FIELD = "vertexField" val SRC_VERTEX_FIELD = "srcVertexField" val DST_VERTEX_FIELD = "dstVertexField" - val RANK_FIELD = "rankField" + val RANK_FIELD = "randkField" val BATCH: String = "batch" val VID_AS_PROP: String = "vidAsProp" val SRC_AS_PROP: String = "srcAsProp" @@ -251,6 +289,7 @@ spark = SparkSession.builder.config( "/path_to/nebula-spark-connector-3.0.0.jar").appName( "nebula-connector").getOrCreate() +# read vertex df = spark.read.format( "com.vesoft.nebula.connector.NebulaDataSource").option( "type", "vertex").option( diff --git a/README_CN.md b/README_CN.md index d044134a..a612f6bb 100644 --- a/README_CN.md +++ b/README_CN.md @@ -176,7 +176,7 @@ only showing top 2 rows ### PySpark 中写 NebulaGraph 中数据 再试一试写入数据的例子,默认不指定的情况下 `writeMode` 是 `insert`: - +#### 写入点 ```python df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "type", "vertex").option( @@ -190,6 +190,7 @@ df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "passwd", "nebula").option( "user", "root").save() ``` +#### 删除点 如果想指定 `delete` 或者 `update` 的非默认写入模式,增加 `writeMode` 的配置,比如 `delete` 的例子: ```python @@ -206,6 +207,44 @@ df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( "writeMode", "delete").option( "user", "root").save() ``` +#### 写入边 +```python +df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\ + .mode("overwrite")\ + .option("srcPolicy", "")\ + .option("dstPolicy", "")\ + .option("metaAddress", "metad0:9559")\ + .option("graphAddress", "graphd:9669")\ + .option("user", "root")\ + .option("passwd", "nebula")\ + .option("type", "edge")\ + .option("spaceName", "basketballplayer")\ + .option("label", "server")\ + .option("srcVertexField", "srcid")\ + .option("dstVertexField", "dstid")\ + .option("rankField", "")\ + .option("batch", 100)\ + .option("writeMode", "insert").save() # delete to delete edge, update to update edge +``` +#### 删除边 +```python +df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\ + .mode("overwrite")\ + .option("srcPolicy", "")\ + .option("dstPolicy", "")\ + .option("metaAddress", "metad0:9559")\ + .option("graphAddress", "graphd:9669")\ + .option("user", "root")\ + .option("passwd", "nebula")\ + .option("type", "edge")\ + .option("spaceName", "basketballplayer")\ + .option("label", "server")\ + .option("srcVertexField", "srcid")\ + .option("dstVertexField", "dstid")\ + .option("randkField", "")\ + .option("batch", 100)\ + .option("writeMode", "delete").save() # delete to delete edge, update to update edge +``` ### 关于 PySpark 读写的 option @@ -222,7 +261,7 @@ df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option( val VERTEX_FIELD = "vertexField" val SRC_VERTEX_FIELD = "srcVertexField" val DST_VERTEX_FIELD = "dstVertexField" - val RANK_FIELD = "rankFiled" + val RANK_FIELD = "randkField" val BATCH: String = "batch" val VID_AS_PROP: String = "vidAsProp" val SRC_AS_PROP: String = "srcAsProp" @@ -252,6 +291,7 @@ spark = SparkSession.builder.config( "/path_to/nebula-spark-connector-3.0.0.jar").appName( "nebula-connector").getOrCreate() +# read vertex df = spark.read.format( "com.vesoft.nebula.connector.NebulaDataSource").option( "type", "vertex").option( diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala index 6565493b..432b3594 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala @@ -220,7 +220,7 @@ object NebulaOptions { val TIMEOUT: String = "timeout" val CONNECTION_RETRY: String = "connectionRetry" val EXECUTION_RETRY: String = "executionRetry" - val RATE_TIME_OUT: String = "reteTimeOut" + val RATE_TIME_OUT: String = "rateTimeOut" val USER_NAME: String = "user" val PASSWD: String = "passwd" val ENABLE_GRAPH_SSL: String = "enableGraphSSL"