From d0f03b52438ca916f01ec983d08ae0ae8110ac94 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 11:47:07 +0800 Subject: [PATCH 1/6] add test data --- .../src/test/resources/edge.csv | 14 ++ .../src/test/resources/vertex.csv | 14 ++ .../nebula/connector/DataTypeEnumSuite.scala | 3 + .../nebula/connector/NebulaUtilsSuite.scala | 3 + .../connector/PartitionUtilsSuite.scala | 3 + .../connector/mock/NebulaGraphMock.scala | 8 + .../nebula/connector/mock/SparkMock.scala | 9 + .../connector/nebula/GraphProviderTest.scala | 2 + .../connector/nebula/MetaProviderTest.scala | 2 + .../nebula/connector/reader/ReadSuite.scala | 8 + .../connector/writer/WriteDeleteSuite.scala | 8 + .../connector/writer/WriteInsertSuite.scala | 205 ++++++++++++++++++ 12 files changed, 279 insertions(+) create mode 100644 nebula-spark-connector/src/test/resources/edge.csv create mode 100644 nebula-spark-connector/src/test/resources/vertex.csv create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala create mode 100644 nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala diff --git a/nebula-spark-connector/src/test/resources/edge.csv b/nebula-spark-connector/src/test/resources/edge.csv new file mode 100644 index 00000000..5034f490 --- /dev/null +++ b/nebula-spark-connector/src/test/resources/edge.csv @@ -0,0 +1,14 @@ +id1,id2,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13 +1,2,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10 +2,3,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10 +3,4,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10 +4,5,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10 +5,6,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10 +6,7,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10 +7,1,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10 +8,1,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10 +9,1,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10 +10,2,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10 +-1,5,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10 +-2,6,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10 +-3,7,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10 diff --git a/nebula-spark-connector/src/test/resources/vertex.csv b/nebula-spark-connector/src/test/resources/vertex.csv new file mode 100644 index 00000000..d4966c19 --- /dev/null +++ b/nebula-spark-connector/src/test/resources/vertex.csv @@ -0,0 +1,14 @@ +id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13 +1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10 +2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10 +3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10 +4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10 +5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10 +6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10 +7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10 +8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10 +9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10 +10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10 +-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10 +-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10 +-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10 diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala new file mode 100644 index 00000000..8c4c2ec3 --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala @@ -0,0 +1,3 @@ +package com.vesoft.nebula.connector + +class DataTypeEnumTest {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala new file mode 100644 index 00000000..9f1ba7d6 --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala @@ -0,0 +1,3 @@ +package com.vesoft.nebula.connector + +class NebulaUtilsTest {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala new file mode 100644 index 00000000..436ffc6d --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala @@ -0,0 +1,3 @@ +package com.vesoft.nebula.connector + +class PartitionUtilsTest extends org.scalatest.FunSuite {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala new file mode 100644 index 00000000..3a213bbe --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.connector.mock +class NebulaGraphMock {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala new file mode 100644 index 00000000..47d69c4d --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala @@ -0,0 +1,9 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.connector.mock class SparkMock { + +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala new file mode 100644 index 00000000..b2577f4d --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala @@ -0,0 +1,2 @@ +package com.vesoft.nebula.connector.nebula +class GraphProviderTest {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala new file mode 100644 index 00000000..670261f1 --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala @@ -0,0 +1,2 @@ +package com.vesoft.nebula.connector.nebula +class MetaProviderTest {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala new file mode 100644 index 00000000..f5017bfb --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.connector.reader +class ReadSuite {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala new file mode 100644 index 00000000..5a9332a8 --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.connector.writer +class WriteDeleteSuite {} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala new file mode 100644 index 00000000..afcaa176 --- /dev/null +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala @@ -0,0 +1,205 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.connector.writer + +import com.facebook.thrift.protocol.TCompactProtocol +import com.vesoft.nebula.client.graph.data.ResultSet +import com.vesoft.nebula.connector.connector.{Address, NebulaDataFrameWriter} +import com.vesoft.nebula.connector.{ + NebulaConnectionConfig, + WriteNebulaEdgeConfig, + WriteNebulaVertexConfig +} +import com.vesoft.nebula.connector.mock.NebulaGraphMock +import com.vesoft.nebula.connector.nebula.GraphProvider +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +import scala.collection.mutable.ListBuffer + +class WriteSuite extends AnyFunSuite with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + val graphMock = new NebulaGraphMock + graphMock.mockStringIdGraphSchema() + graphMock.mockIntIdGraphSchema() + graphMock.close() + } + + test("write vertex into test_write_string space with insert mode") { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read.option("header", true).csv("src/test/resources/vertex.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + .builder() + .withSpace("test_write_string") + .withTag("person") + .withVidField("id") + .withVidAsProp(false) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() + + spark.stop() + + val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) + val graphProvider = new GraphProvider(addresses) + + val createIndexResult: ResultSet = graphProvider.submit( + "use test_write_string; " + + "create tag index if not exists person_index on person(col1(20));") + Thread.sleep(5000) + graphProvider.submit( + "use test_write_string; " + + "rebuild tag index person_index;") + + Thread.sleep(5000) + + val resultSet: ResultSet = + graphProvider.submit("use test_write_string;match (v:person) return v;") + assert(resultSet.getColumnNames.size() == 1) + assert(resultSet.getRows.size() == 13) + val resultString: ListBuffer[String] = new ListBuffer[String] + for (i <- 0 until resultSet.getRows.size) { + resultString.append(resultSet.rowValues(i).toString) + } + + val expectResultString: ListBuffer[String] = new ListBuffer[String] + expectResultString.append( + "ColumnName: [v], Values: [(2 :person_connector {col13: utc time: 11:10:10.000000, timezoneOffset: 0, col12: 2.0999999046325684, col11: 1.1, col8: utc datetime: 2021-01-02T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 41, col10: false, col7: 2021-01-28, col4: 21, col5: 31, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 11, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 0, col10: true, col6: 0, col7: 2019-01-02, col4: 1112, col5: 22222, col2: \"abcdefgh\", col3: 2, col1: \"abb\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(6 :person_connector {col13: utc time: 15:10:10.000000, timezoneOffset: 0, col12: 2.5, col11: 1.5, col8: utc datetime: 2021-01-06T12:10:10.000000, timezoneOffset: 0, col9: 0, col10: false, col6: 45, col7: 2021-02-02, col4: 25, col5: 35, col2: \"王五\u0000\u0000\", col3: 15, col1: \"王五\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: true, col6: 0, col7: 2021-12-12, col4: 1111, col5: 2147483647, col2: \"abcdefgh\", col3: 6, col1: \"ab\\sf\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(1 :person_connector {col13: utc time: 10:10:10.000000, timezoneOffset: 0, col12: 2.0, col11: 1.0, col8: utc datetime: 2021-01-01T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 40, col10: true, col7: 2021-01-27, col4: 20, col5: 30, col2: \"tom\u0000\u0000\u0000\u0000\u0000\", col3: 10, col1: \"Tom\"} :person {col13: utc time: 11:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 1, col1: \"aba\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(-1 :person_connector {col13: utc time: 20:10:10.000000, timezoneOffset: 0, col12: 3.0, col11: 2.0, col8: utc datetime: 2021-02-11T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 50, col7: 2021-02-07, col4: 30, col5: 40, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 20, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: -1, col1: \"\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(5 :person_connector {col13: utc time: 14:10:10.000000, timezoneOffset: 0, col12: 2.4000000953674316, col11: 1.4, col8: utc datetime: 2021-01-05T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 44, col10: false, col7: 2021-02-01, col4: 24, col5: 34, col2: \"李四\u0000\u0000\", col3: 14, col1: \"李四\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 0.0, col11: 0.0, col8: utc datetime: 1970-01-01T00:00:01.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 0, col7: 1970-01-01, col4: 1111, col5: 2147483647, col2: \"abcdefgh\", col3: 5, col1: \"a\\\"be\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(-3 :person_connector {col13: utc time: 22:10:10.000000, timezoneOffset: 0, col12: 3.200000047683716, col11: 2.2, col8: utc datetime: 2021-04-13T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 52, col7: 2021-02-09, col4: 32, col5: 42, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 22, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 10.0, col11: 10.0, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\", col3: -3, col1: \"abm\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(3 :person_connector {col13: utc time: 12:10:10.000000, timezoneOffset: 0, col12: 2.200000047683716, col11: 1.2, col8: utc datetime: 2021-01-03T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 42, col10: false, col7: 2021-01-29, col4: 22, col5: 32, col2: \"Tim\u0000\u0000\u0000\u0000\u0000\", col3: 12, col1: \"Tim\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: true, col6: 9223372036854775807, col7: 2019-01-03, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 3, col1: \"ab\\tc\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(-2 :person_connector {col13: utc time: 21:10:10.000000, timezoneOffset: 0, col12: 3.0999999046325684, col11: 2.1, col8: utc datetime: 2021-03-12T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 51, col7: 2021-02-08, col4: 31, col5: 41, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 21, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: 6412233, col10: false, col7: 2019-01-01, col4: 1111, col5: 22222, col2: NULL, col3: -2, col1: NULL})]") + expectResultString.append( + "ColumnName: [v], Values: [(4 :person_connector {col13: utc time: 13:10:10.000000, timezoneOffset: 0, col12: 2.299999952316284, col11: 1.3, col8: utc datetime: 2021-01-04T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 43, col10: true, col7: 2021-01-30, col4: 23, col5: 33, col2: \"张三\u0000\u0000\", col3: 13, col1: \"张三\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: -9223372036854775808, col10: false, col7: 2019-01-04, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 4, col1: \"a\\tbd\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(7 :person_connector {col13: utc time: 16:10:10.000000, timezoneOffset: 0, col12: 2.5999999046325684, col11: 1.6, col8: utc datetime: 2021-01-07T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: true, col6: 46, col7: 2021-02-03, col4: 26, col5: 36, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 16, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: -2147483648, col2: \"abcdefgh\", col3: 7, col1: \"abg\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(8 :person_connector {col13: utc time: 17:10:10.000000, timezoneOffset: 0, col12: 2.700000047683716, col11: 1.7, col8: utc datetime: 2021-01-08T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 47, col7: 2021-02-04, col4: 27, col5: 37, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 17, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 32767, col5: 0, col2: \"abcdefgh\", col3: 8, col1: \"abh\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(9 :person_connector {col13: utc time: 18:10:10.000000, timezoneOffset: 0, col12: 2.799999952316284, col11: 1.8, col8: utc datetime: 2021-01-09T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 48, col10: true, col7: 2021-02-05, col4: 28, col5: 38, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 18, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: 6412233, col10: false, col7: 2019-01-01, col4: -32767, col5: -32767, col2: \"abcdefgh\", col3: 9, col1: \"abi\"})]") + expectResultString.append( + "ColumnName: [v], Values: [(10 :person_connector {col13: utc time: 19:10:10.000000, timezoneOffset: 0, col12: 2.9000000953674316, col11: 1.9, col8: utc datetime: 2021-01-10T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 49, col10: false, col7: 2021-02-06, col4: 29, col5: 39, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 19, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: -32768, col5: 32767, col2: \"abcdefgh\", col3: 10, col1: \"abj\"})]") + + assert(resultString.containsSlice(expectResultString)) + } + + test("write edge into test_write_string space with insert mode") { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read.option("header", true).csv("src/test/resources/edge.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig + .builder() + .withSpace("test_write_string") + .withEdge("friend") + .withSrcIdField("id1") + .withDstIdField("id2") + .withRankField("col3") + .withRankAsProperty(true) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() + + spark.stop() + + val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) + val graphProvider = new GraphProvider(addresses) + + val createIndexResult: ResultSet = graphProvider.submit( + "use test_write_string; " + + "create edge index if not exists friend_index on friend(col1(20));") + Thread.sleep(5000) + graphProvider.submit( + "use test_write_string; " + + "rebuild edge index friend_index;") + + Thread.sleep(5000) + + val resultSet: ResultSet = + graphProvider.submit("use test_write_string;match (v:person) return v;") + assert(resultSet.getColumnNames.size() == 1) + assert(resultSet.getRows.size() == 13) + val resultString: ListBuffer[String] = new ListBuffer[String] + for (i <- 0 until resultSet.getRows.size) { + resultString.append(resultSet.rowValues(i).toString) + } + + val expectResultString: ListBuffer[String] = new ListBuffer[String] + expectResultString.append( + "ColumnName: [e], Values: [(2)-[:friend_connector@11{col13: utc time: 11:10:10.000000, timezoneOffset: 0, col12: 2.0999999046325684, col11: 1.1, col8: utc datetime: 2021-01-02T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 41, col10: false, col7: 2021-01-28, col4: 21, col5: 31, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 11, col1: \"Jina\"}]->(3)]") + expectResultString.append( + "ColumnName: [e], Values: [(6)-[:friend_connector@15{col13: utc time: 15:10:10.000000, timezoneOffset: 0, col12: 2.5, col11: 1.5, col8: utc datetime: 2021-01-06T12:10:10.000000, timezoneOffset: 0, col9: 0, col10: false, col6: 45, col7: 2021-02-02, col4: 25, col5: 35, col2: \"王五\u0000\u0000\", col3: 15, col1: \"王五\"}]->(7)]") + expectResultString.append( + "ColumnName: [e], Values: [(1)-[:friend_connector@10{col13: utc time: 10:10:10.000000, timezoneOffset: 0, col12: 2.0, col11: 1.0, col8: utc datetime: 2021-01-01T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 40, col10: true, col7: 2021-01-27, col4: 20, col5: 30, col2: \"tom\u0000\u0000\u0000\u0000\u0000\", col3: 10, col1: \"Tom\"}]->(2)]") + expectResultString.append( + "ColumnName: [e], Values: [(-1)-[:friend_connector@20{col13: utc time: 20:10:10.000000, timezoneOffset: 0, col12: 3.0, col11: 2.0, col8: utc datetime: 2021-02-11T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 50, col7: 2021-02-07, col4: 30, col5: 40, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 20, col1: \"Jina\"}]->(5)]") + expectResultString.append( + "ColumnName: [e], Values: [(5)-[:friend_connector@14{col13: utc time: 14:10:10.000000, timezoneOffset: 0, col12: 2.4000000953674316, col11: 1.4, col8: utc datetime: 2021-01-05T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 44, col7: 2021-02-01, col4: 24, col5: 34, col2: \"李四\u0000\u0000\", col3: 14, col1: \"李四\"}]->(6)]") + expectResultString.append( + "ColumnName: [e], Values: [(-3)-[:friend_connector@22{col13: utc time: 22:10:10.000000, timezoneOffset: 0, col12: 3.200000047683716, col11: 2.2, col8: utc datetime: 2021-04-13T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 52, col10: false, col7: 2021-02-09, col4: 32, col5: 42, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 22, col1: \"Jina\"}]->(7)]") + expectResultString.append( + "ColumnName: [e], Values: [(3)-[:friend_connector@12{col13: utc time: 12:10:10.000000, timezoneOffset: 0, col12: 2.200000047683716, col11: 1.2, col8: utc datetime: 2021-01-03T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 42, col10: false, col7: 2021-01-29, col4: 22, col5: 32, col2: \"Tim\u0000\u0000\u0000\u0000\u0000\", col3: 12, col1: \"Tim\"}]->(4)]") + expectResultString.append( + "ColumnName: [e], Values: [(-2)-[:friend_connector@21{col13: utc time: 21:10:10.000000, timezoneOffset: 0, col12: 3.0999999046325684, col11: 2.1, col8: utc datetime: 2021-03-12T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 51, col10: false, col7: 2021-02-08, col4: 31, col5: 41, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 21, col1: \"Jina\"}]->(6)]") + expectResultString.append( + "ColumnName: [e], Values: [(4)-[:friend_connector@13{col13: utc time: 13:10:10.000000, timezoneOffset: 0, col12: 2.299999952316284, col11: 1.3, col8: utc datetime: 2021-01-04T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: true, col6: 43, col7: 2021-01-30, col4: 23, col5: 33, col2: \"张三\u0000\u0000\", col3: 13, col1: \"张三\"}]->(5)]") + expectResultString.append( + "ColumnName: [e], Values: [(7)-[:friend_connector@16{col13: utc time: 16:10:10.000000, timezoneOffset: 0, col12: 2.5999999046325684, col11: 1.6, col8: utc datetime: 2021-01-07T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 46, col10: true, col7: 2021-02-03, col4: 26, col5: 36, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 16, col1: \"Jina\"}]->(1)]") + expectResultString.append( + "ColumnName: [e], Values: [(8)-[:friend_connector@17{col13: utc time: 17:10:10.000000, timezoneOffset: 0, col12: 2.700000047683716, col11: 1.7, col8: utc datetime: 2021-01-08T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 47, col7: 2021-02-04, col4: 27, col5: 37, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 17, col1: \"Jina\"}]->(1)]") + expectResultString.append( + "ColumnName: [e], Values: [(9)-[:friend_connector@18{col13: utc time: 18:10:10.000000, timezoneOffset: 0, col12: 2.799999952316284, col11: 1.8, col8: utc datetime: 2021-01-09T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 48, col10: true, col7: 2021-02-05, col4: 28, col5: 38, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 18, col1: \"Jina\"}]->(1)]") + expectResultString.append( + "ColumnName: [e], Values: [(10)-[:friend_connector@19{col13: utc time: 19:10:10.000000, timezoneOffset: 0, col12: 2.9000000953674316, col11: 1.9, col8: utc datetime: 2021-01-10T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 49, col10: false, col7: 2021-02-06, col4: 29, col5: 39, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 19, col1: \"Jina\"}]->(2)]") + assert(resultString.containsSlice(expectResultString)) + } +} From 6c46cbc8dbd420396bb5c6d31675612061071bf5 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 11:48:41 +0800 Subject: [PATCH 2/6] add utils test --- .../connector/mock/NebulaGraphMock.scala | 180 +++++++++++++++++- .../nebula/connector/mock/SparkMock.scala | 175 ++++++++++++++++- .../connector/nebula/GraphProviderTest.scala | 40 +++- .../connector/nebula/MetaProviderTest.scala | 99 +++++++++- 4 files changed, 488 insertions(+), 6 deletions(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala index 3a213bbe..e3277bb4 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/NebulaGraphMock.scala @@ -1,8 +1,184 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ package com.vesoft.nebula.connector.mock -class NebulaGraphMock {} + +import com.vesoft.nebula.client.graph.NebulaPoolConfig +import com.vesoft.nebula.client.graph.data.HostAddress +import com.vesoft.nebula.client.graph.net.NebulaPool +import org.apache.log4j.Logger + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +class NebulaGraphMock { + private[this] val LOG = Logger.getLogger(this.getClass) + + @transient val nebulaPoolConfig = new NebulaPoolConfig + @transient val pool: NebulaPool = new NebulaPool + val address = new ListBuffer[HostAddress]() + address.append(new HostAddress("127.0.0.1", 9669)) + + val randAddr = scala.util.Random.shuffle(address) + pool.init(randAddr.asJava, nebulaPoolConfig) + + def mockStringIdGraph(): Unit = { + val session = pool.getSession("root", "nebula", true) + + val createSpace = "CREATE SPACE IF NOT EXISTS test_string(partition_num=10,vid_type=fixed_string(8));" + + "USE test_string;" + "CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" + + "CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);"; + val createResp = session.execute(createSpace) + if (!createResp.isSucceeded) { + close() + LOG.error("create string type space failed," + createResp.getErrorMessage) + sys.exit(-1) + } + + Thread.sleep(10000) + val insertTag = + "INSERT VERTEX person(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13) VALUES " + + " \"1\":(\"person1\", \"person1\", 11, 200, 1000, 188888, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00\"),timestamp(\"2021-01-01T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"2\":(\"person2\", \"person2\", 12, 300, 2000, 288888, date(\"2021-01-02\"), datetime(\"2021-01-02T12:00:00\"),timestamp(\"2021-01-02T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"3\":(\"person3\", \"person3\", 13, 400, 3000, 388888, date(\"2021-01-03\"), datetime(\"2021-01-03T12:00:00\"),timestamp(\"2021-01-03T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"4\":(\"person4\", \"person4\", 14, 500, 4000, 488888, date(\"2021-01-04\"), datetime(\"2021-01-04T12:00:00\"),timestamp(\"2021-01-04T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"5\":(\"person5\", \"person5\", 15, 600, 5000, 588888, date(\"2021-01-05\"), datetime(\"2021-01-05T12:00:00\"),timestamp(\"2021-01-05T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"6\":(\"person6\", \"person6\", 16, 700, 6000, 688888, date(\"2021-01-06\"), datetime(\"2021-01-06T12:00:00\"),timestamp(\"2021-01-06T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"7\":(\"person7\", \"person7\", 17, 800, 7000, 788888, date(\"2021-01-07\"), datetime(\"2021-01-07T12:00:00\"),timestamp(\"2021-01-07T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"8\":(\"person8\", \"person8\", 18, 900, 8000, 888888, date(\"2021-01-08\"), datetime(\"2021-01-08T12:00:00\"),timestamp(\"2021-01-08T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"9\":(\"person9\", \"person9\", 19, 1000, 9000, 988888, date(\"2021-01-09\"), datetime(\"2021-01-09T12:00:00\"),timestamp(\"2021-01-09T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"10\":(\"person10\", \"person10\", 20, 1100, 10000, 1088888, date(\"2021-01-10\"), datetime(\"2021-01-10T12:00:00\"),timestamp(\"2021-01-10T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"11\":(\"person11\", \"person11\", 21, 1200, 11000, 1188888, date(\"2021-01-11\"), datetime(\"2021-01-11T12:00:00\"),timestamp(\"2021-01-11T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"12\":(\"person12\", \"person11\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"-1\":(\"person00\", \"person00\", 23, 1400, 13000, 1388888, date(\"2021-01-13\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"-2\":(\"person01\", \"person01\", 24, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"-3\":(\"person02\", \"person02\", 24, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"19\":(\"person19\", \"person22\", 25, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"22\":(\"person22\", \"person22\", 26, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))" + val insertTagResp = session.execute(insertTag) + if (!insertTagResp.isSucceeded) { + close() + LOG.error("insert vertex for string type space failed," + insertTagResp.getErrorMessage) + sys.exit(-1) + } + + val insertEdge = "INSERT EDGE friend(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13) VALUES " + + " \"1\" -> \"2\":(\"friend1\", \"friend2\", 11, 200, 1000, 188888, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00\"),timestamp(\"2021-01-01T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"2\" -> \"3\":(\"friend2\", \"friend3\", 12, 300, 2000, 288888, date(\"2021-01-02\"), datetime(\"2021-01-02T12:00:00\"),timestamp(\"2021-01-02T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"3\" -> \"4\":(\"friend3\", \"friend4\", 13, 400, 3000, 388888, date(\"2021-01-03\"), datetime(\"2021-01-03T12:00:00\"),timestamp(\"2021-01-03T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"4\" -> \"5\":(\"friend4\", \"friend4\", 14, 500, 4000, 488888, date(\"2021-01-04\"), datetime(\"2021-01-04T12:00:00\"),timestamp(\"2021-01-04T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"5\" -> \"6\":(\"friend5\", \"friend5\", 15, 600, 5000, 588888, date(\"2021-01-05\"), datetime(\"2021-01-05T12:00:00\"),timestamp(\"2021-01-05T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"6\" -> \"7\":(\"friend6\", \"friend6\", 16, 700, 6000, 688888, date(\"2021-01-06\"), datetime(\"2021-01-06T12:00:00\"),timestamp(\"2021-01-06T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"7\" -> \"8\":(\"friend7\", \"friend7\", 17, 800, 7000, 788888, date(\"2021-01-07\"), datetime(\"2021-01-07T12:00:00\"),timestamp(\"2021-01-07T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"8\" -> \"9\":(\"friend8\", \"friend8\", 18, 900, 8000, 888888, date(\"2021-01-08\"), datetime(\"2021-01-08T12:00:00\"),timestamp(\"2021-01-08T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"9\" -> \"10\":(\"friend9\", \"friend9\", 19, 1000, 9000, 988888, date(\"2021-01-09\"), datetime(\"2021-01-09T12:00:00\"),timestamp(\"2021-01-09T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"10\" -> \"11\":(\"friend10\", \"friend10\", 20, 1100, 10000, 1088888, date(\"2021-01-10\"), datetime(\"2021-01-10T12:00:00\"),timestamp(\"2021-01-10T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"11\" -> \"12\":(\"friend11\", \"friend11\", 21, 1200, 11000, 1188888, date(\"2021-01-11\"), datetime(\"2021-01-11T12:00:00\"),timestamp(\"2021-01-11T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " \"12\" -> \"1\":(\"friend12\", \"friend11\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"-1\" -> \"11\":(\"friend13\", \"friend12\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " \"-2\" -> \"-1\":(\"friend14\", \"friend13\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))" + val insertEdgeResp = session.execute(insertEdge) + if (!insertEdgeResp.isSucceeded) { + close() + LOG.error("insert edge for string type space failed," + insertEdgeResp.getErrorMessage) + sys.exit(-1) + } + } + + def mockIntIdGraph(): Unit = { + val session = pool.getSession("root", "nebula", true) + + val createSpace = "CREATE SPACE IF NOT EXISTS test_int(partition_num=10, vid_type=int64);" + + "USE test_int;" + "CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" + + "CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);"; + val createResp = session.execute(createSpace) + if (!createResp.isSucceeded) { + close() + LOG.error("create int type space failed," + createResp.getErrorMessage) + sys.exit(-1) + } + + Thread.sleep(10000) + val insertTag = + "INSERT VERTEX person(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13) VALUES " + + " 1:(\"person1\", \"person1\", 11, 200, 1000, 188888, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00\"),timestamp(\"2021-01-01T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 2:(\"person2\", \"person2\", 12, 300, 2000, 288888, date(\"2021-01-02\"), datetime(\"2021-01-02T12:00:00\"),timestamp(\"2021-01-02T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 3:(\"person3\", \"person3\", 13, 400, 3000, 388888, date(\"2021-01-03\"), datetime(\"2021-01-03T12:00:00\"),timestamp(\"2021-01-03T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 4:(\"person4\", \"person4\", 14, 500, 4000, 488888, date(\"2021-01-04\"), datetime(\"2021-01-04T12:00:00\"),timestamp(\"2021-01-04T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 5:(\"person5\", \"person5\", 15, 600, 5000, 588888, date(\"2021-01-05\"), datetime(\"2021-01-05T12:00:00\"),timestamp(\"2021-01-05T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 6:(\"person6\", \"person6\", 16, 700, 6000, 688888, date(\"2021-01-06\"), datetime(\"2021-01-06T12:00:00\"),timestamp(\"2021-01-06T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 7:(\"person7\", \"person7\", 17, 800, 7000, 788888, date(\"2021-01-07\"), datetime(\"2021-01-07T12:00:00\"),timestamp(\"2021-01-07T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 8:(\"person8\", \"person8\", 18, 900, 8000, 888888, date(\"2021-01-08\"), datetime(\"2021-01-08T12:00:00\"),timestamp(\"2021-01-08T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 9:(\"person9\", \"person9\", 19, 1000, 9000, 988888, date(\"2021-01-09\"), datetime(\"2021-01-09T12:00:00\"),timestamp(\"2021-01-09T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 10:(\"person10\", \"person10\", 20, 1100, 10000, 1088888, date(\"2021-01-10\"), datetime(\"2021-01-10T12:00:00\"),timestamp(\"2021-01-10T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 11:(\"person11\", \"person11\", 21, 1200, 11000, 1188888, date(\"2021-01-11\"), datetime(\"2021-01-11T12:00:00\"),timestamp(\"2021-01-11T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 12:(\"person12\", \"person11\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " -1:(\"person00\", \"person00\", 23, 1400, 13000, 1388888, date(\"2021-01-13\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " -2:(\"person01\", \"person01\", 24, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " -3:(\"person02\", \"person02\", 24, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 19:(\"person19\", \"person22\", 25, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 22:(\"person22\", \"person22\", 26, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\")), " + + " 0:(null, null, null, null, null, null, null, null, null, null, null, null, null)" + val insertTagResp = session.execute(insertTag) + if (!insertTagResp.isSucceeded) { + close() + LOG.error("insert vertex for int type space failed," + insertTagResp.getErrorMessage) + sys.exit(-1) + } + + val insertEdge = "INSERT EDGE friend(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13) VALUES " + + " 1 -> 2:(\"friend1\", \"friend2\", 11, 200, 1000, 188888, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00\"),timestamp(\"2021-01-01T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 2 -> 3:(\"friend2\", \"friend3\", 12, 300, 2000, 288888, date(\"2021-01-02\"), datetime(\"2021-01-02T12:00:00\"),timestamp(\"2021-01-02T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 3 -> 4:(\"friend3\", \"friend4\", 13, 400, 3000, 388888, date(\"2021-01-03\"), datetime(\"2021-01-03T12:00:00\"),timestamp(\"2021-01-03T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 4 -> 5:(\"friend4\", \"friend4\", 14, 500, 4000, 488888, date(\"2021-01-04\"), datetime(\"2021-01-04T12:00:00\"),timestamp(\"2021-01-04T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 5 -> 6:(\"friend5\", \"friend5\", 15, 600, 5000, 588888, date(\"2021-01-05\"), datetime(\"2021-01-05T12:00:00\"),timestamp(\"2021-01-05T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 6 -> 7:(\"friend6\", \"friend6\", 16, 700, 6000, 688888, date(\"2021-01-06\"), datetime(\"2021-01-06T12:00:00\"),timestamp(\"2021-01-06T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 7 -> 8:(\"friend7\", \"friend7\", 17, 800, 7000, 788888, date(\"2021-01-07\"), datetime(\"2021-01-07T12:00:00\"),timestamp(\"2021-01-07T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 8 -> 9:(\"friend8\", \"friend8\", 18, 900, 8000, 888888, date(\"2021-01-08\"), datetime(\"2021-01-08T12:00:00\"),timestamp(\"2021-01-08T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 9 -> 10:(\"friend9\", \"friend9\", 19, 1000, 9000, 988888, date(\"2021-01-09\"), datetime(\"2021-01-09T12:00:00\"),timestamp(\"2021-01-09T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 10 -> 11:(\"friend10\", \"friend10\", 20, 1100, 10000, 1088888, date(\"2021-01-10\"), datetime(\"2021-01-10T12:00:00\"),timestamp(\"2021-01-10T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," + + " 11 -> 12:(\"friend11\", \"friend11\", 21, 1200, 11000, 1188888, date(\"2021-01-11\"), datetime(\"2021-01-11T12:00:00\"),timestamp(\"2021-01-11T12:00:00\"), false, 1.0, 2.0, time(\"12:01:01\"))," + + " 12 -> 1:(\"friend12\", \"friend11\", 22, 1300, 12000, 1288888, date(\"2021-01-12\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))" + val insertEdgeResp = session.execute(insertEdge) + if (!insertEdgeResp.isSucceeded) { + close() + LOG.error("insert edge for int type space failed," + insertEdgeResp.getErrorMessage) + sys.exit(-1) + } + } + + def mockStringIdGraphSchema(): Unit = { + val session = pool.getSession("root", "nebula", true) + + val createSpace = "CREATE SPACE IF NOT EXISTS test_write_string(partition_num=10,vid_type=fixed_string(8));" + + "USE test_write_string;" + "CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" + + "CREATE EDGE IF NOT EXISTS friend_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);"; + val createResp = session.execute(createSpace) + if (!createResp.isSucceeded) { + close() + LOG.error("create string type space failed," + createResp.getErrorMessage) + sys.exit(-1) + } + } + + def mockIntIdGraphSchema(): Unit = { + val session = pool.getSession("root", "nebula", true) + + val createSpace = "CREATE SPACE IF NOT EXISTS test_write_int(partition_num=10, vid_type=int64);" + + "USE test_write_int;" + "CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" + + "CREATE EDGE IF NOT EXISTS friend_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);"; + val createResp = session.execute(createSpace) + if (!createResp.isSucceeded) { + close() + LOG.error("create int type space failed," + createResp.getErrorMessage) + sys.exit(-1) + } + } + + def close(): Unit = { + pool.close() + } +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala index 47d69c4d..eb97bf83 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala @@ -3,7 +3,178 @@ * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package com.vesoft.nebula.connector.mock class SparkMock { + +package com.vesoft.nebula.connector.mock + +import com.facebook.thrift.protocol.TCompactProtocol +import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter +import com.vesoft.nebula.connector.{ + NebulaConnectionConfig, + WriteMode, + WriteNebulaEdgeConfig, + WriteNebulaVertexConfig +} +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +object SparkMock { + + /** + * write nebula vertex with insert mode + */ + def writeVertex(): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read + .option("header", true) + .csv("nebula-spark-connector/src/test/resources/vertex.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + .builder() + .withSpace("test_write_string") + .withTag("person_connector") + .withVidField("id") + .withVidAsProp(false) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() + + spark.stop() + } + + /** + * write nebula vertex with delete mode + */ + def deleteVertex(): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read + .option("header", true) + .csv("nebula-spark-connector/src/test/resources/vertex.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + .builder() + .withSpace("test_write_string") + .withTag("person_connector") + .withVidField("id") + .withVidAsProp(false) + .withWriteMode(WriteMode.DELETE) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() + + spark.stop() + } + + /** + * write nebula edge with insert mode + */ + def writeEdge(): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read + .option("header", true) + .csv("nebula-spark-connector/src/test/resources/edge.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig + .builder() + .withSpace("test_write_string") + .withEdge("friend_connector") + .withSrcIdField("id1") + .withDstIdField("id2") + .withRankField("col3") + .withRankAsProperty(true) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() + + spark.stop() + } + + /** + * write nebula edge with delete mode + */ + def deleteEdge(): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read + .option("header", true) + .csv("nebula-spark-connector/src/test/resources/edge.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig + .builder() + .withSpace("test_write_string") + .withEdge("friend_connector") + .withSrcIdField("id1") + .withDstIdField("id2") + .withRankField("col3") + .withRankAsProperty(true) + .withWriteMode(WriteMode.DELETE) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() + + spark.stop() + } } diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala index b2577f4d..a18432a2 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala @@ -1,2 +1,40 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + package com.vesoft.nebula.connector.nebula -class GraphProviderTest {} + +import com.vesoft.nebula.connector.connector.Address +import com.vesoft.nebula.connector.mock.NebulaGraphMock +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +class GraphProviderTest extends AnyFunSuite with BeforeAndAfterAll { + + var graphProvider: GraphProvider = null + + override def beforeAll(): Unit = { + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) + graphProvider = new GraphProvider(addresses) + val graphMock = new NebulaGraphMock + graphMock.mockIntIdGraph() + graphMock.mockStringIdGraph() + graphMock.close() + } + + override def afterAll(): Unit = { + graphProvider.close() + } + + test("switchSpace") { + assert(!graphProvider.switchSpace("root", "nebula", "not_exit_space")) + assert(graphProvider.switchSpace("root", "nebula", "test_int")) + } + + test("submit") { + val result = graphProvider.submit("insert vertex person(col1) values 100:(\"test\")") + assert(result.isSucceeded) + } +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala index 670261f1..c8a4ddef 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala @@ -1,2 +1,99 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + package com.vesoft.nebula.connector.nebula -class MetaProviderTest {} + +import com.vesoft.nebula.connector.DataTypeEnum +import com.vesoft.nebula.connector.connector.Address +import com.vesoft.nebula.connector.mock.NebulaGraphMock +import com.vesoft.nebula.meta.{PropertyType, Schema} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +class MetaProviderTest extends AnyFunSuite with BeforeAndAfterAll { + var metaProvider: MetaProvider = null + + override def beforeAll(): Unit = { + val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) + metaProvider = new MetaProvider(addresses) + + val graphMock = new NebulaGraphMock + graphMock.mockStringIdGraph() + graphMock.mockIntIdGraph() + graphMock.close() + } + + override def afterAll(): Unit = { + metaProvider.close() + } + + test("getPartitionNumber") { + assert(metaProvider.getPartitionNumber("test_int") == 10) + assert(metaProvider.getPartitionNumber("test_string") == 10) + } + + test("getVidType") { + assert(metaProvider.getVidType("test_int") == VidType.INT) + assert(metaProvider.getVidType("test_string") == VidType.STRING) + } + + test("getTag") { + val schema: Schema = metaProvider.getTag("test_int", "person") + assert(schema.columns.size() == 13) + + val schema1: Schema = metaProvider.getTag("test_string", "person") + assert(schema1.columns.size() == 13) + } + + test("getEdge") { + val schema: Schema = metaProvider.getEdge("test_int", "friend") + assert(schema.columns.size() == 13) + + val schema1: Schema = metaProvider.getEdge("test_string", "friend") + assert(schema1.columns.size() == 13) + } + + test("getTagSchema") { + val schemaMap: Map[String, Integer] = metaProvider.getTagSchema("test_int", "person") + assert(schemaMap.size == 13) + assert(schemaMap("col1") == PropertyType.STRING.getValue) + assert(schemaMap("col2") == PropertyType.FIXED_STRING.getValue) + assert(schemaMap("col3") == PropertyType.INT8.getValue) + assert(schemaMap("col4") == PropertyType.INT16.getValue) + assert(schemaMap("col5") == PropertyType.INT32.getValue) + assert(schemaMap("col6") == PropertyType.INT64.getValue) + assert(schemaMap("col7") == PropertyType.DATE.getValue) + assert(schemaMap("col8") == PropertyType.DATETIME.getValue) + assert(schemaMap("col9") == PropertyType.TIMESTAMP.getValue) + assert(schemaMap("col10") == PropertyType.BOOL.getValue) + assert(schemaMap("col11") == PropertyType.DOUBLE.getValue) + assert(schemaMap("col12") == PropertyType.FLOAT.getValue) + assert(schemaMap("col13") == PropertyType.TIME.getValue) + } + + test("getEdgeSchema") { + val schemaMap: Map[String, Integer] = metaProvider.getEdgeSchema("test_int", "friend") + assert(schemaMap.size == 13) + assert(schemaMap("col1") == PropertyType.STRING.getValue) + assert(schemaMap("col2") == PropertyType.FIXED_STRING.getValue) + assert(schemaMap("col3") == PropertyType.INT8.getValue) + assert(schemaMap("col4") == PropertyType.INT16.getValue) + assert(schemaMap("col5") == PropertyType.INT32.getValue) + assert(schemaMap("col6") == PropertyType.INT64.getValue) + assert(schemaMap("col7") == PropertyType.DATE.getValue) + assert(schemaMap("col8") == PropertyType.DATETIME.getValue) + assert(schemaMap("col9") == PropertyType.TIMESTAMP.getValue) + assert(schemaMap("col10") == PropertyType.BOOL.getValue) + assert(schemaMap("col11") == PropertyType.DOUBLE.getValue) + assert(schemaMap("col12") == PropertyType.FLOAT.getValue) + assert(schemaMap("col13") == PropertyType.TIME.getValue) + } + + test("getLabelType") { + assert(metaProvider.getLabelType("test_int", "person") == DataTypeEnum.VERTEX) + assert(metaProvider.getLabelType("test_int", "friend") == DataTypeEnum.EDGE) + } +} From 50b726d7caee2676b19c5d6104e102bc4d09ab09 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 11:58:11 +0800 Subject: [PATCH 3/6] add utils test --- .../nebula/connector/DataTypeEnumSuite.scala | 20 ++- .../nebula/connector/NebulaUtilsSuite.scala | 96 ++++++++++++++- .../connector/PartitionUtilsSuite.scala | 116 +++++++++++++++++- 3 files changed, 229 insertions(+), 3 deletions(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala index 8c4c2ec3..232d31f4 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/DataTypeEnumSuite.scala @@ -1,3 +1,21 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + package com.vesoft.nebula.connector -class DataTypeEnumTest {} +import org.scalatest.funsuite.AnyFunSuite + +class DataTypeEnumSuite extends AnyFunSuite { + + test("validDataType") { + assert(DataTypeEnum.validDataType("vertex")) + assert(DataTypeEnum.validDataType("VERTEX")) + assert(DataTypeEnum.validDataType("edge")) + assert(DataTypeEnum.validDataType("EDGE")) + assert(!DataTypeEnum.validDataType("relation")) + } + +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala index 9f1ba7d6..38ff8952 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/NebulaUtilsSuite.scala @@ -1,3 +1,97 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + package com.vesoft.nebula.connector -class NebulaUtilsTest {} +import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, PropertyType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + LongType, + StringType, + StructField, + StructType +} +import org.scalatest.funsuite.AnyFunSuite + +class NebulaUtilsSuite extends AnyFunSuite { + + test("convertDataType") { + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.VID)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.INT8)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.INT16)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.INT32)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.INT64)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.TIMESTAMP)) == LongType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.BOOL)) == BooleanType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.FLOAT)) == DoubleType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DOUBLE)) == DoubleType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.FIXED_STRING)) == StringType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.STRING)) == StringType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DATE)) == StringType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DATETIME)) == StringType) + assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.TIME)) == StringType) + assertThrows[IllegalArgumentException]( + NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.UNKNOWN))) + } + + test("getColDataType") { + val columnDefs: List[ColumnDef] = List( + new ColumnDef("col1".getBytes(), new ColumnTypeDef(PropertyType.INT8)), + new ColumnDef("col2".getBytes(), new ColumnTypeDef(PropertyType.DOUBLE)), + new ColumnDef("col3".getBytes(), new ColumnTypeDef(PropertyType.STRING)), + new ColumnDef("col4".getBytes(), new ColumnTypeDef(PropertyType.DATE)), + new ColumnDef("col5".getBytes(), new ColumnTypeDef(PropertyType.DATETIME)), + new ColumnDef("col6".getBytes(), new ColumnTypeDef(PropertyType.TIME)), + new ColumnDef("col7".getBytes(), new ColumnTypeDef(PropertyType.TIMESTAMP)), + new ColumnDef("col8".getBytes(), new ColumnTypeDef(PropertyType.BOOL)) + ) + assert(NebulaUtils.getColDataType(columnDefs, "col1") == LongType) + assert(NebulaUtils.getColDataType(columnDefs, "col2") == DoubleType) + assert(NebulaUtils.getColDataType(columnDefs, "col3") == StringType) + assert(NebulaUtils.getColDataType(columnDefs, "col4") == StringType) + assert(NebulaUtils.getColDataType(columnDefs, "col5") == StringType) + assert(NebulaUtils.getColDataType(columnDefs, "col6") == StringType) + assert(NebulaUtils.getColDataType(columnDefs, "col7") == LongType) + assert(NebulaUtils.getColDataType(columnDefs, "col8") == BooleanType) + assertThrows[IllegalArgumentException](NebulaUtils.getColDataType(columnDefs, "col9")) + } + + test("makeGetters") { + val schema = StructType( + List( + StructField("col1", LongType, nullable = false), + StructField("col2", LongType, nullable = true) + )) + assert(NebulaUtils.makeGetters(schema).length == 2) + } + + test("isNumic") { + assert(NebulaUtils.isNumic("123")) + assert(NebulaUtils.isNumic("-123")) + assert(!NebulaUtils.isNumic("1.0")) + assert(!NebulaUtils.isNumic("a123")) + assert(!NebulaUtils.isNumic("123b")) + } + + test("escapeUtil") { + assert(NebulaUtils.escapeUtil("123").equals("123")) + // a\bc -> a\\bc + assert(NebulaUtils.escapeUtil("a\bc").equals("a\\bc")) + // a\tbc -> a\\tbc + assert(NebulaUtils.escapeUtil("a\tbc").equals("a\\tbc")) + // a\nbc -> a\\nbc + assert(NebulaUtils.escapeUtil("a\nbc").equals("a\\nbc")) + // a\"bc -> a\\"bc + assert(NebulaUtils.escapeUtil("a\"bc").equals("a\\\"bc")) + // a\'bc -> a\\'bc + assert(NebulaUtils.escapeUtil("a\'bc").equals("a\\'bc")) + // a\rbc -> a\\rbc + assert(NebulaUtils.escapeUtil("a\rbc").equals("a\\rbc")) + // a\bbc -> a\\bbc + assert(NebulaUtils.escapeUtil("a\bbc").equals("a\\bbc")) + } +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala index 436ffc6d..dc46d8bb 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala @@ -1,3 +1,117 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + package com.vesoft.nebula.connector -class PartitionUtilsTest extends org.scalatest.FunSuite {} +import org.scalatest.funsuite.AnyFunSuite + +/** + * base data: spark partition is 10 + */ +class PartitionUtilsSuite extends AnyFunSuite { + val partition: Int = 10 + + test("getScanParts: nebula part is the same with spark partition") { + val nebulaPart: Int = 10 + val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) + assert(partsForIndex0.size == 1) + assert(partsForIndex0.head == 1) + + val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) + assert(partsForIndex1.head == 2) + + val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) + assert(partsForIndex2.head == 3) + + val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) + assert(partsForIndex3.head == 4) + + val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) + assert(partsForIndex4.head == 5) + + val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) + assert(partsForIndex5.head == 6) + + val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) + assert(partsForIndex6.head == 7) + + val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) + assert(partsForIndex7.head == 8) + + val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) + assert(partsForIndex8.head == 9) + + val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) + assert(partsForIndex9.head == 10) + } + + test("getScanParts: nebula part is more than spark partition") { + val nebulaPart: Int = 20 + val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) + assert(partsForIndex0.contains(1) && partsForIndex0.contains(11)) + + val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) + assert(partsForIndex1.contains(2) && partsForIndex1.contains(12)) + + val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) + assert(partsForIndex2.contains(3) && partsForIndex2.contains(13)) + + val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) + assert(partsForIndex3.contains(4) && partsForIndex3.contains(14)) + + val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) + assert(partsForIndex4.contains(5) && partsForIndex4.contains(15)) + + val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) + assert(partsForIndex5.contains(6) && partsForIndex5.contains(16)) + + val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) + assert(partsForIndex6.contains(7) && partsForIndex6.contains(17)) + + val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) + assert(partsForIndex7.contains(8) && partsForIndex7.contains(18)) + + val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) + assert(partsForIndex8.contains(9) && partsForIndex8.contains(19)) + + val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) + assert(partsForIndex9.contains(10) && partsForIndex9.contains(20)) + } + + test("getScanParts: nebula part is less than spark partition") { + val nebulaPart: Int = 5 + val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) + assert(partsForIndex0.contains(1)) + + val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) + assert(partsForIndex1.contains(2)) + + val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) + assert(partsForIndex2.contains(3)) + + val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) + assert(partsForIndex3.contains(4)) + + val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) + assert(partsForIndex4.contains(5)) + + val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) + assert(partsForIndex5.isEmpty) + + val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) + assert(partsForIndex6.isEmpty) + + val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) + assert(partsForIndex7.isEmpty) + + val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) + assert(partsForIndex8.isEmpty) + + val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) + assert(partsForIndex9.isEmpty) + } + +} From aa808d19fc9fd30b8e850b3b181f0e420d497e55 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 15:07:52 +0800 Subject: [PATCH 4/6] fix data path --- .../com/vesoft/nebula/connector/mock/SparkMock.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala index eb97bf83..2837f60c 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala @@ -35,7 +35,7 @@ object SparkMock { val df = spark.read .option("header", true) - .csv("nebula-spark-connector/src/test/resources/vertex.csv") + .csv("src/test/resources/vertex.csv") val config = NebulaConnectionConfig @@ -73,7 +73,7 @@ object SparkMock { val df = spark.read .option("header", true) - .csv("nebula-spark-connector/src/test/resources/vertex.csv") + .csv("src/test/resources/vertex.csv") val config = NebulaConnectionConfig @@ -112,7 +112,7 @@ object SparkMock { val df = spark.read .option("header", true) - .csv("nebula-spark-connector/src/test/resources/edge.csv") + .csv("src/test/resources/edge.csv") val config = NebulaConnectionConfig @@ -152,7 +152,7 @@ object SparkMock { val df = spark.read .option("header", true) - .csv("nebula-spark-connector/src/test/resources/edge.csv") + .csv("src/test/resources/edge.csv") val config = NebulaConnectionConfig From ee0e288d3c9118ca606e4ab6e280f708ba7e7568 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 15:20:03 +0800 Subject: [PATCH 5/6] add write and read test --- .../connector/nebula/GraphProviderTest.scala | 2 +- .../nebula/connector/reader/ReadSuite.scala | 265 +++++++++++++++++- .../connector/writer/WriteDeleteSuite.scala | 45 ++- .../connector/writer/WriteInsertSuite.scala | 171 ++--------- 4 files changed, 327 insertions(+), 156 deletions(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala index a18432a2..fa2399ac 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/GraphProviderTest.scala @@ -34,7 +34,7 @@ class GraphProviderTest extends AnyFunSuite with BeforeAndAfterAll { } test("submit") { - val result = graphProvider.submit("insert vertex person(col1) values 100:(\"test\")") + val result = graphProvider.submit("fetch prop on person 1") assert(result.isSucceeded) } } diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala index f5017bfb..2b5ac979 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/reader/ReadSuite.scala @@ -1,8 +1,269 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ package com.vesoft.nebula.connector.reader -class ReadSuite {} + +import com.facebook.thrift.protocol.TCompactProtocol +import com.vesoft.nebula.connector.connector.NebulaDataFrameReader +import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig} +import com.vesoft.nebula.connector.mock.NebulaGraphMock +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoders, SparkSession} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +class ReadSuite extends AnyFunSuite with BeforeAndAfterAll { + + var sparkSession: SparkSession = null + + override def beforeAll(): Unit = { + val graphMock = new NebulaGraphMock + graphMock.mockIntIdGraph() + graphMock.close() + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + sparkSession = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + } + + override def afterAll(): Unit = { + sparkSession.stop() + } + + test("read vertex with no properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("person") + .withNoColumn(true) + .withLimit(10) + .withPartitionNum(10) + .build() + val vertex = sparkSession.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF() + vertex.printSchema() + vertex.show() + assert(vertex.count() == 18) + assert(vertex.schema.fields.length == 1) + } + + test("read vertex with specific properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("person") + .withNoColumn(false) + .withReturnCols(List("col1")) + .withLimit(10) + .withPartitionNum(10) + .build() + val vertex = sparkSession.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF() + vertex.printSchema() + vertex.show() + assert(vertex.count() == 18) + assert(vertex.schema.fields.length == 2) + + vertex.map(row => { + row.getAs[Long]("_vertexId") match { + case 1L => { + assert(row.getAs[String]("col1").equals("person1")) + } + case 0L => { + assert(row.isNullAt(1)) + } + } + "" + })(Encoders.STRING) + + } + + test("read vertex with all properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("person") + .withNoColumn(false) + .withReturnCols(List()) + .withLimit(10) + .withPartitionNum(10) + .build() + val vertex = sparkSession.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF() + vertex.printSchema() + vertex.show() + assert(vertex.count() == 18) + assert(vertex.schema.fields.length == 14) + + vertex.map(row => { + row.getAs[Long]("_vertexId") match { + case 1L => { + assert(row.getAs[String]("col1").equals("person1")) + assert(row.getAs[String]("col2").equals("person1")) + assert(row.getAs[Long]("col3") == 11) + assert(row.getAs[Long]("col4") == 200) + assert(row.getAs[Long]("col5") == 1000) + assert(row.getAs[Long]("col6") == 188888) + assert(row.getAs[String]("col7").equals("2021-01-01")) + assert(row.getAs[String]("col8").equals("2021-01-01T12:00:00.000")) + assert(row.getAs[Long]("col9") == 1609502400) + assert(row.getAs[Boolean]("col10")) + assert(row.getAs[Double]("col11") < 1.001) + assert(row.getAs[Double]("col12") < 2.001) + assert(row.getAs[String]("col13").equals("12:01:01")) + } + case 0L => { + assert(row.isNullAt(1)) + assert(row.isNullAt(2)) + assert(row.isNullAt(3)) + assert(row.isNullAt(4)) + assert(row.isNullAt(5)) + assert(row.isNullAt(6)) + assert(row.isNullAt(7)) + assert(row.isNullAt(8)) + assert(row.isNullAt(9)) + assert(row.isNullAt(10)) + assert(row.isNullAt(11)) + assert(row.isNullAt(12)) + assert(row.isNullAt(13)) + } + } + "" + })(Encoders.STRING) + } + + test("read edge with no properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("friend") + .withNoColumn(true) + .withLimit(10) + .withPartitionNum(10) + .build() + val edge = sparkSession.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() + edge.printSchema() + edge.show() + assert(edge.count() == 12) + assert(edge.schema.fields.length == 3) + + edge.map(row => { + row.getAs[Long]("_srcId") match { + case 1L => { + assert(row.getAs[Long]("_dstId") == 2) + assert(row.getAs[Long]("_rank") == 0) + } + } + "" + })(Encoders.STRING) + } + + test("read edge with specific properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("friend") + .withNoColumn(false) + .withReturnCols(List("col1")) + .withLimit(10) + .withPartitionNum(10) + .build() + val edge = sparkSession.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() + edge.printSchema() + edge.show(20) + assert(edge.count() == 12) + assert(edge.schema.fields.length == 4) + edge.map(row => { + row.getAs[Long]("_srcId") match { + case 1L => { + assert(row.getAs[Long]("_dstId") == 2) + assert(row.getAs[Long]("_rank") == 0) + assert(row.getAs[String]("col1").equals("friend1")) + } + } + "" + })(Encoders.STRING) + } + + test("read edge with all properties") { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(2) + .build() + val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test_int") + .withLabel("friend") + .withNoColumn(false) + .withReturnCols(List()) + .withLimit(10) + .withPartitionNum(10) + .build() + val edge = sparkSession.read.nebula(config, nebulaReadVertexConfig).loadEdgesToDF() + edge.printSchema() + edge.show() + assert(edge.count() == 12) + assert(edge.schema.fields.length == 16) + + edge.map(row => { + row.getAs[Long]("_srcId") match { + case 1L => { + assert(row.getAs[Long]("_dstId") == 2) + assert(row.getAs[Long]("_rank") == 0) + assert(row.getAs[String]("col1").equals("friend1")) + assert(row.getAs[String]("col2").equals("friend2")) + assert(row.getAs[Long]("col3") == 11) + assert(row.getAs[Long]("col4") == 200) + assert(row.getAs[Long]("col5") == 1000) + assert(row.getAs[Long]("col6") == 188888) + assert(row.getAs[String]("col7").equals("2021-01-01")) + assert(row.getAs[String]("col8").equals("2021-01-01T12:00:00.000")) + assert(row.getAs[Long]("col9") == 1609502400) + assert(row.getAs[Boolean]("col10")) + assert(row.getAs[Double]("col11") < 1.001) + assert(row.getAs[Double]("col12") < 2.001) + assert(row.getAs[String]("col13").equals("12:01:01")) + } + } + "" + })(Encoders.STRING) + } + +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala index 5a9332a8..ee557c43 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala @@ -1,8 +1,49 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ package com.vesoft.nebula.connector.writer -class WriteDeleteSuite {} + +import com.vesoft.nebula.client.graph.data.ResultSet +import com.vesoft.nebula.connector.connector.Address +import com.vesoft.nebula.connector.mock.{NebulaGraphMock, SparkMock} +import com.vesoft.nebula.connector.nebula.GraphProvider +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + val graphMock = new NebulaGraphMock + graphMock.mockStringIdGraphSchema() + graphMock.mockIntIdGraphSchema() + graphMock.close() + SparkMock.writeVertex() + } + + test("write vertex into test_write_string space with delete mode") { + SparkMock.deleteVertex() + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) + val graphProvider = new GraphProvider(addresses) + + graphProvider.switchSpace("root", "nebula", "test_write_string") + val resultSet: ResultSet = + graphProvider.submit("use test_write_string;match (v:person_connector) return v;") + assert(resultSet.getColumnNames.size() == 0) + assert(resultSet.isEmpty) + } + + test("write edge into test_write_string space with delete mode") { + SparkMock.deleteEdge() + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) + val graphProvider = new GraphProvider(addresses) + + graphProvider.switchSpace("root", "nebula", "test_write_string") + val resultSet: ResultSet = + graphProvider.submit("use test_write_string;fetch prop on friend_connector 1->2@10") + assert(resultSet.getColumnNames.size() == 0) + assert(resultSet.isEmpty) + } +} diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala index afcaa176..773d0494 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteInsertSuite.scala @@ -6,24 +6,14 @@ package com.vesoft.nebula.connector.writer -import com.facebook.thrift.protocol.TCompactProtocol import com.vesoft.nebula.client.graph.data.ResultSet -import com.vesoft.nebula.connector.connector.{Address, NebulaDataFrameWriter} -import com.vesoft.nebula.connector.{ - NebulaConnectionConfig, - WriteNebulaEdgeConfig, - WriteNebulaVertexConfig -} -import com.vesoft.nebula.connector.mock.NebulaGraphMock +import com.vesoft.nebula.connector.connector.{Address} +import com.vesoft.nebula.connector.mock.{NebulaGraphMock, SparkMock} import com.vesoft.nebula.connector.nebula.GraphProvider -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite -import scala.collection.mutable.ListBuffer - -class WriteSuite extends AnyFunSuite with BeforeAndAfterAll { +class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { val graphMock = new NebulaGraphMock @@ -33,173 +23,52 @@ class WriteSuite extends AnyFunSuite with BeforeAndAfterAll { } test("write vertex into test_write_string space with insert mode") { - val sparkConf = new SparkConf - sparkConf - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) - val spark = SparkSession - .builder() - .master("local") - .config(sparkConf) - .getOrCreate() - - val df = spark.read.option("header", true).csv("src/test/resources/vertex.csv") - - val config = - NebulaConnectionConfig - .builder() - .withMetaAddress("127.0.0.1:9559") - .withGraphAddress("127.0.0.1:9669") - .withConenctionRetry(2) - .build() - val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig - .builder() - .withSpace("test_write_string") - .withTag("person") - .withVidField("id") - .withVidAsProp(false) - .withBatch(5) - .build() - df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() - - spark.stop() - - val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) + SparkMock.writeVertex() + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) val graphProvider = new GraphProvider(addresses) + graphProvider.switchSpace("root", "nebula", "test_write_string") val createIndexResult: ResultSet = graphProvider.submit( "use test_write_string; " - + "create tag index if not exists person_index on person(col1(20));") + + "create tag index if not exists person_index on person_connector(col1(20));") Thread.sleep(5000) - graphProvider.submit( - "use test_write_string; " - + "rebuild tag index person_index;") + graphProvider.submit("rebuild tag index person_index;") Thread.sleep(5000) val resultSet: ResultSet = - graphProvider.submit("use test_write_string;match (v:person) return v;") + graphProvider.submit("use test_write_string;match (v:person_connector) return v;") assert(resultSet.getColumnNames.size() == 1) assert(resultSet.getRows.size() == 13) - val resultString: ListBuffer[String] = new ListBuffer[String] + for (i <- 0 until resultSet.getRows.size) { - resultString.append(resultSet.rowValues(i).toString) + println(resultSet.rowValues(i).toString) } - - val expectResultString: ListBuffer[String] = new ListBuffer[String] - expectResultString.append( - "ColumnName: [v], Values: [(2 :person_connector {col13: utc time: 11:10:10.000000, timezoneOffset: 0, col12: 2.0999999046325684, col11: 1.1, col8: utc datetime: 2021-01-02T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 41, col10: false, col7: 2021-01-28, col4: 21, col5: 31, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 11, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 0, col10: true, col6: 0, col7: 2019-01-02, col4: 1112, col5: 22222, col2: \"abcdefgh\", col3: 2, col1: \"abb\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(6 :person_connector {col13: utc time: 15:10:10.000000, timezoneOffset: 0, col12: 2.5, col11: 1.5, col8: utc datetime: 2021-01-06T12:10:10.000000, timezoneOffset: 0, col9: 0, col10: false, col6: 45, col7: 2021-02-02, col4: 25, col5: 35, col2: \"王五\u0000\u0000\", col3: 15, col1: \"王五\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: true, col6: 0, col7: 2021-12-12, col4: 1111, col5: 2147483647, col2: \"abcdefgh\", col3: 6, col1: \"ab\\sf\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(1 :person_connector {col13: utc time: 10:10:10.000000, timezoneOffset: 0, col12: 2.0, col11: 1.0, col8: utc datetime: 2021-01-01T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 40, col10: true, col7: 2021-01-27, col4: 20, col5: 30, col2: \"tom\u0000\u0000\u0000\u0000\u0000\", col3: 10, col1: \"Tom\"} :person {col13: utc time: 11:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 1, col1: \"aba\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(-1 :person_connector {col13: utc time: 20:10:10.000000, timezoneOffset: 0, col12: 3.0, col11: 2.0, col8: utc datetime: 2021-02-11T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 50, col7: 2021-02-07, col4: 30, col5: 40, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 20, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: -1, col1: \"\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(5 :person_connector {col13: utc time: 14:10:10.000000, timezoneOffset: 0, col12: 2.4000000953674316, col11: 1.4, col8: utc datetime: 2021-01-05T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 44, col10: false, col7: 2021-02-01, col4: 24, col5: 34, col2: \"李四\u0000\u0000\", col3: 14, col1: \"李四\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 0.0, col11: 0.0, col8: utc datetime: 1970-01-01T00:00:01.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 0, col7: 1970-01-01, col4: 1111, col5: 2147483647, col2: \"abcdefgh\", col3: 5, col1: \"a\\\"be\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(-3 :person_connector {col13: utc time: 22:10:10.000000, timezoneOffset: 0, col12: 3.200000047683716, col11: 2.2, col8: utc datetime: 2021-04-13T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 52, col7: 2021-02-09, col4: 32, col5: 42, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 22, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 10.0, col11: 10.0, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: 22222, col2: \"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\", col3: -3, col1: \"abm\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(3 :person_connector {col13: utc time: 12:10:10.000000, timezoneOffset: 0, col12: 2.200000047683716, col11: 1.2, col8: utc datetime: 2021-01-03T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 42, col10: false, col7: 2021-01-29, col4: 22, col5: 32, col2: \"Tim\u0000\u0000\u0000\u0000\u0000\", col3: 12, col1: \"Tim\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: true, col6: 9223372036854775807, col7: 2019-01-03, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 3, col1: \"ab\\tc\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(-2 :person_connector {col13: utc time: 21:10:10.000000, timezoneOffset: 0, col12: 3.0999999046325684, col11: 2.1, col8: utc datetime: 2021-03-12T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 51, col7: 2021-02-08, col4: 31, col5: 41, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 21, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: 6412233, col10: false, col7: 2019-01-01, col4: 1111, col5: 22222, col2: NULL, col3: -2, col1: NULL})]") - expectResultString.append( - "ColumnName: [v], Values: [(4 :person_connector {col13: utc time: 13:10:10.000000, timezoneOffset: 0, col12: 2.299999952316284, col11: 1.3, col8: utc datetime: 2021-01-04T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 43, col10: true, col7: 2021-01-30, col4: 23, col5: 33, col2: \"张三\u0000\u0000\", col3: 13, col1: \"张三\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: -9223372036854775808, col10: false, col7: 2019-01-04, col4: 1111, col5: 22222, col2: \"abcdefgh\", col3: 4, col1: \"a\\tbd\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(7 :person_connector {col13: utc time: 16:10:10.000000, timezoneOffset: 0, col12: 2.5999999046325684, col11: 1.6, col8: utc datetime: 2021-01-07T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: true, col6: 46, col7: 2021-02-03, col4: 26, col5: 36, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 16, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 1111, col5: -2147483648, col2: \"abcdefgh\", col3: 7, col1: \"abg\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(8 :person_connector {col13: utc time: 17:10:10.000000, timezoneOffset: 0, col12: 2.700000047683716, col11: 1.7, col8: utc datetime: 2021-01-08T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 47, col7: 2021-02-04, col4: 27, col5: 37, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 17, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: 32767, col5: 0, col2: \"abcdefgh\", col3: 8, col1: \"abh\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(9 :person_connector {col13: utc time: 18:10:10.000000, timezoneOffset: 0, col12: 2.799999952316284, col11: 1.8, col8: utc datetime: 2021-01-09T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 48, col10: true, col7: 2021-02-05, col4: 28, col5: 38, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 18, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col6: 6412233, col10: false, col7: 2019-01-01, col4: -32767, col5: -32767, col2: \"abcdefgh\", col3: 9, col1: \"abi\"})]") - expectResultString.append( - "ColumnName: [v], Values: [(10 :person_connector {col13: utc time: 19:10:10.000000, timezoneOffset: 0, col12: 2.9000000953674316, col11: 1.9, col8: utc datetime: 2021-01-10T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 49, col10: false, col7: 2021-02-06, col4: 29, col5: 39, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 19, col1: \"Jina\"} :person {col13: utc time: 12:12:12.000000, timezoneOffset: 0, col12: 1.0, col11: 1.2, col8: utc datetime: 2019-01-01T12:12:12.000000, timezoneOffset: 0, col9: 435463424, col10: false, col6: 6412233, col7: 2019-01-01, col4: -32768, col5: 32767, col2: \"abcdefgh\", col3: 10, col1: \"abj\"})]") - - assert(resultString.containsSlice(expectResultString)) } test("write edge into test_write_string space with insert mode") { - val sparkConf = new SparkConf - sparkConf - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) - val spark = SparkSession - .builder() - .master("local") - .config(sparkConf) - .getOrCreate() - - val df = spark.read.option("header", true).csv("src/test/resources/edge.csv") - - val config = - NebulaConnectionConfig - .builder() - .withMetaAddress("127.0.0.1:9559") - .withGraphAddress("127.0.0.1:9669") - .withConenctionRetry(2) - .build() - val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig - .builder() - .withSpace("test_write_string") - .withEdge("friend") - .withSrcIdField("id1") - .withDstIdField("id2") - .withRankField("col3") - .withRankAsProperty(true) - .withBatch(5) - .build() - df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() + SparkMock.writeEdge() - spark.stop() - - val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) val graphProvider = new GraphProvider(addresses) + graphProvider.switchSpace("root", "nebula", "test_write_string") val createIndexResult: ResultSet = graphProvider.submit( "use test_write_string; " - + "create edge index if not exists friend_index on friend(col1(20));") + + "create edge index if not exists friend_index on friend_connector(col1(20));") Thread.sleep(5000) - graphProvider.submit( - "use test_write_string; " - + "rebuild edge index friend_index;") + graphProvider.submit("rebuild edge index friend_index;") Thread.sleep(5000) val resultSet: ResultSet = - graphProvider.submit("use test_write_string;match (v:person) return v;") + graphProvider.submit( + "use test_write_string;match (v:person_connector)-[e:friend_connector] -> () return e;") assert(resultSet.getColumnNames.size() == 1) assert(resultSet.getRows.size() == 13) - val resultString: ListBuffer[String] = new ListBuffer[String] + for (i <- 0 until resultSet.getRows.size) { - resultString.append(resultSet.rowValues(i).toString) + println(resultSet.rowValues(i).toString) } - - val expectResultString: ListBuffer[String] = new ListBuffer[String] - expectResultString.append( - "ColumnName: [e], Values: [(2)-[:friend_connector@11{col13: utc time: 11:10:10.000000, timezoneOffset: 0, col12: 2.0999999046325684, col11: 1.1, col8: utc datetime: 2021-01-02T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 41, col10: false, col7: 2021-01-28, col4: 21, col5: 31, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 11, col1: \"Jina\"}]->(3)]") - expectResultString.append( - "ColumnName: [e], Values: [(6)-[:friend_connector@15{col13: utc time: 15:10:10.000000, timezoneOffset: 0, col12: 2.5, col11: 1.5, col8: utc datetime: 2021-01-06T12:10:10.000000, timezoneOffset: 0, col9: 0, col10: false, col6: 45, col7: 2021-02-02, col4: 25, col5: 35, col2: \"王五\u0000\u0000\", col3: 15, col1: \"王五\"}]->(7)]") - expectResultString.append( - "ColumnName: [e], Values: [(1)-[:friend_connector@10{col13: utc time: 10:10:10.000000, timezoneOffset: 0, col12: 2.0, col11: 1.0, col8: utc datetime: 2021-01-01T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 40, col10: true, col7: 2021-01-27, col4: 20, col5: 30, col2: \"tom\u0000\u0000\u0000\u0000\u0000\", col3: 10, col1: \"Tom\"}]->(2)]") - expectResultString.append( - "ColumnName: [e], Values: [(-1)-[:friend_connector@20{col13: utc time: 20:10:10.000000, timezoneOffset: 0, col12: 3.0, col11: 2.0, col8: utc datetime: 2021-02-11T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 50, col7: 2021-02-07, col4: 30, col5: 40, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 20, col1: \"Jina\"}]->(5)]") - expectResultString.append( - "ColumnName: [e], Values: [(5)-[:friend_connector@14{col13: utc time: 14:10:10.000000, timezoneOffset: 0, col12: 2.4000000953674316, col11: 1.4, col8: utc datetime: 2021-01-05T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 44, col7: 2021-02-01, col4: 24, col5: 34, col2: \"李四\u0000\u0000\", col3: 14, col1: \"李四\"}]->(6)]") - expectResultString.append( - "ColumnName: [e], Values: [(-3)-[:friend_connector@22{col13: utc time: 22:10:10.000000, timezoneOffset: 0, col12: 3.200000047683716, col11: 2.2, col8: utc datetime: 2021-04-13T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 52, col10: false, col7: 2021-02-09, col4: 32, col5: 42, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 22, col1: \"Jina\"}]->(7)]") - expectResultString.append( - "ColumnName: [e], Values: [(3)-[:friend_connector@12{col13: utc time: 12:10:10.000000, timezoneOffset: 0, col12: 2.200000047683716, col11: 1.2, col8: utc datetime: 2021-01-03T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 42, col10: false, col7: 2021-01-29, col4: 22, col5: 32, col2: \"Tim\u0000\u0000\u0000\u0000\u0000\", col3: 12, col1: \"Tim\"}]->(4)]") - expectResultString.append( - "ColumnName: [e], Values: [(-2)-[:friend_connector@21{col13: utc time: 21:10:10.000000, timezoneOffset: 0, col12: 3.0999999046325684, col11: 2.1, col8: utc datetime: 2021-03-12T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 51, col10: false, col7: 2021-02-08, col4: 31, col5: 41, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 21, col1: \"Jina\"}]->(6)]") - expectResultString.append( - "ColumnName: [e], Values: [(4)-[:friend_connector@13{col13: utc time: 13:10:10.000000, timezoneOffset: 0, col12: 2.299999952316284, col11: 1.3, col8: utc datetime: 2021-01-04T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: true, col6: 43, col7: 2021-01-30, col4: 23, col5: 33, col2: \"张三\u0000\u0000\", col3: 13, col1: \"张三\"}]->(5)]") - expectResultString.append( - "ColumnName: [e], Values: [(7)-[:friend_connector@16{col13: utc time: 16:10:10.000000, timezoneOffset: 0, col12: 2.5999999046325684, col11: 1.6, col8: utc datetime: 2021-01-07T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 46, col10: true, col7: 2021-02-03, col4: 26, col5: 36, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 16, col1: \"Jina\"}]->(1)]") - expectResultString.append( - "ColumnName: [e], Values: [(8)-[:friend_connector@17{col13: utc time: 17:10:10.000000, timezoneOffset: 0, col12: 2.700000047683716, col11: 1.7, col8: utc datetime: 2021-01-08T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col10: false, col6: 47, col7: 2021-02-04, col4: 27, col5: 37, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 17, col1: \"Jina\"}]->(1)]") - expectResultString.append( - "ColumnName: [e], Values: [(9)-[:friend_connector@18{col13: utc time: 18:10:10.000000, timezoneOffset: 0, col12: 2.799999952316284, col11: 1.8, col8: utc datetime: 2021-01-09T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 48, col10: true, col7: 2021-02-05, col4: 28, col5: 38, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 18, col1: \"Jina\"}]->(1)]") - expectResultString.append( - "ColumnName: [e], Values: [(10)-[:friend_connector@19{col13: utc time: 19:10:10.000000, timezoneOffset: 0, col12: 2.9000000953674316, col11: 1.9, col8: utc datetime: 2021-01-10T12:10:10.000000, timezoneOffset: 0, col9: 43535232, col6: 49, col10: false, col7: 2021-02-06, col4: 29, col5: 39, col2: \"Jina\u0000\u0000\u0000\u0000\", col3: 19, col1: \"Jina\"}]->(2)]") - assert(resultString.containsSlice(expectResultString)) } } From d992edaa53f73069d5c2a60ba5eb72066d184735 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 19 Oct 2021 16:50:19 +0800 Subject: [PATCH 6/6] improve test code --- .../connector/PartitionUtilsSuite.scala | 106 +++--------------- 1 file changed, 18 insertions(+), 88 deletions(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala index dc46d8bb..c6a95507 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/PartitionUtilsSuite.scala @@ -16,102 +16,32 @@ class PartitionUtilsSuite extends AnyFunSuite { test("getScanParts: nebula part is the same with spark partition") { val nebulaPart: Int = 10 - val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) - assert(partsForIndex0.size == 1) - assert(partsForIndex0.head == 1) - - val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) - assert(partsForIndex1.head == 2) - - val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) - assert(partsForIndex2.head == 3) - - val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) - assert(partsForIndex3.head == 4) - - val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) - assert(partsForIndex4.head == 5) - - val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) - assert(partsForIndex5.head == 6) - - val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) - assert(partsForIndex6.head == 7) - - val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) - assert(partsForIndex7.head == 8) - - val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) - assert(partsForIndex8.head == 9) - - val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) - assert(partsForIndex9.head == 10) + for (i <- 1 to 10) { + val partsForIndex = PartitionUtils.getScanParts(i, nebulaPart, partition) + assert(partsForIndex.size == 1) + assert(partsForIndex.head == i) + } } test("getScanParts: nebula part is more than spark partition") { val nebulaPart: Int = 20 - val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) - assert(partsForIndex0.contains(1) && partsForIndex0.contains(11)) - - val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) - assert(partsForIndex1.contains(2) && partsForIndex1.contains(12)) - - val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) - assert(partsForIndex2.contains(3) && partsForIndex2.contains(13)) - - val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) - assert(partsForIndex3.contains(4) && partsForIndex3.contains(14)) - - val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) - assert(partsForIndex4.contains(5) && partsForIndex4.contains(15)) - - val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) - assert(partsForIndex5.contains(6) && partsForIndex5.contains(16)) - - val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) - assert(partsForIndex6.contains(7) && partsForIndex6.contains(17)) - - val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) - assert(partsForIndex7.contains(8) && partsForIndex7.contains(18)) - - val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) - assert(partsForIndex8.contains(9) && partsForIndex8.contains(19)) - - val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) - assert(partsForIndex9.contains(10) && partsForIndex9.contains(20)) + for (i <- 1 to 10) { + val partsForIndex = PartitionUtils.getScanParts(i, nebulaPart, partition) + assert(partsForIndex.contains(i) && partsForIndex.contains(i + 10)) + assert(partsForIndex.size == 2) + } } test("getScanParts: nebula part is less than spark partition") { val nebulaPart: Int = 5 - val partsForIndex0 = PartitionUtils.getScanParts(1, nebulaPart, partition) - assert(partsForIndex0.contains(1)) - - val partsForIndex1 = PartitionUtils.getScanParts(2, nebulaPart, partition) - assert(partsForIndex1.contains(2)) - - val partsForIndex2 = PartitionUtils.getScanParts(3, nebulaPart, partition) - assert(partsForIndex2.contains(3)) - - val partsForIndex3 = PartitionUtils.getScanParts(4, nebulaPart, partition) - assert(partsForIndex3.contains(4)) - - val partsForIndex4 = PartitionUtils.getScanParts(5, nebulaPart, partition) - assert(partsForIndex4.contains(5)) - - val partsForIndex5 = PartitionUtils.getScanParts(6, nebulaPart, partition) - assert(partsForIndex5.isEmpty) - - val partsForIndex6 = PartitionUtils.getScanParts(7, nebulaPart, partition) - assert(partsForIndex6.isEmpty) - - val partsForIndex7 = PartitionUtils.getScanParts(8, nebulaPart, partition) - assert(partsForIndex7.isEmpty) - - val partsForIndex8 = PartitionUtils.getScanParts(9, nebulaPart, partition) - assert(partsForIndex8.isEmpty) - - val partsForIndex9 = PartitionUtils.getScanParts(10, nebulaPart, partition) - assert(partsForIndex9.isEmpty) + for (i <- 1 to 5) { + val partsForIndex = PartitionUtils.getScanParts(i, nebulaPart, partition) + assert(partsForIndex.contains(i)) + } + for (j <- 6 to 10) { + val partsForIndex = PartitionUtils.getScanParts(j, nebulaPart, partition) + assert(partsForIndex.isEmpty) + } } }