diff --git a/conf-template/client_import/csv_datasource.conf b/conf-template/client_import/csv_datasource.conf index 94985058..28ab62b2 100644 --- a/conf-template/client_import/csv_datasource.conf +++ b/conf-template/client_import/csv_datasource.conf @@ -65,6 +65,8 @@ nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] vertex: { field: csv-field-0 + # add the prefix for vertex id value, eg: original id is 12345, and the real id will be: tag1_12345 + prefix:"tag1" udf: { separator: "_" oldColNames: [parquet-field-0, parquet-field-1] @@ -90,13 +92,17 @@ path: "hdfs://ip:port/path/test.csv" fields: [csv-field-0, csv-field-1, csv-field-2] nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] - source: csv-field-0 - target: csv-field-1 - ranking: csv-field-2 - separator: "," - header: true - batch: 2000 - partition: 60 - } + source: { + field: csv-field-0 + # add the prefix for source id value, eg: original id is 12345, and the real id will be: edge1_12345 + prefix:"edge1" + } + target: csv-field-1 + ranking: csv-field-2 + separator: "," + header: true + batch: 2000 + partition: 60 + } ] } diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 6a9ae6cf..b4c145cf 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -404,7 +404,9 @@ object Configs { // You can specified the vertex field name via the com.vesoft.exchange.common.config item `vertex` // If you want to qualified the key policy, you can wrap them into a block. + var prefix: String = null val vertexField = if (tagConfig.hasPath("vertex.field")) { + prefix = getOrElse(tagConfig, "vertex.prefix", null) tagConfig.getString("vertex.field") } else { tagConfig.getString("vertex") @@ -455,6 +457,7 @@ object Configs { nebulaFields, vertexField, policyOpt, + prefix, batch, partition, checkPointPath, @@ -504,8 +507,10 @@ object Configs { val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) LOG.info(s"Sink Config ${sourceConfig}") + var sourcePrefix: String = null val sourceField = if (!isGeo) { if (edgeConfig.hasPath("source.field")) { + sourcePrefix = getOrElse(edgeConfig, "source.prefix", null) edgeConfig.getString("source.field") } else { edgeConfig.getString("source") @@ -524,8 +529,9 @@ object Configs { } else { None } - + var targetPrefix: String = null val targetField: String = if (edgeConfig.hasPath("target.field")) { + targetPrefix = getOrElse(edgeConfig, "target.prefix", null) edgeConfig.getString("target.field") } else { edgeConfig.getString("target") @@ -591,9 +597,11 @@ object Configs { nebulaFields, sourceField, sourcePolicy, + sourcePrefix, ranking, targetField, targetPolicy, + targetPrefix, isGeo, latitude, longitude, diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala index 99d1e7b0..145fafc6 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala @@ -57,6 +57,7 @@ case class TagConfigEntry(override val name: String, override val nebulaFields: List[String], vertexField: String, vertexPolicy: Option[KeyPolicy.Value], + vertexPrefix: String, override val batch: Int, override val partition: Int, override val checkPointPath: Option[String], @@ -110,9 +111,11 @@ case class EdgeConfigEntry(override val name: String, override val nebulaFields: List[String], sourceField: String, sourcePolicy: Option[KeyPolicy.Value], + sourcePrefix: String, rankingField: Option[String], targetField: String, targetPolicy: Option[KeyPolicy.Value], + targetPrefix: String, isGeo: Boolean, latitude: Option[String], longitude: Option[String], diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala index d09ef3d4..3cb82aa7 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala @@ -87,6 +87,7 @@ class NebulaUtilsSuite { nebulaFields, "id", Some(KeyPolicy.UUID), + null, 1, 1, Some("")) @@ -136,6 +137,7 @@ class NebulaUtilsSuite { wrongNebulaFields, "id", Some(KeyPolicy.UUID), + null, 1, 1, Some("")) @@ -222,18 +224,18 @@ class NebulaUtilsSuite { @Test def getAddressFromString(): Unit = { - var addr = "127.0.0.1:9669" + var addr = "127.0.0.1:9669" var hostAddress = NebulaUtils.getAddressFromString(addr) assert("127.0.0.1".equals(hostAddress.getHost)) - assert(hostAddress.getPort==9669) + assert(hostAddress.getPort == 9669) - addr="localhost:9669" + addr = "localhost:9669" hostAddress = NebulaUtils.getAddressFromString(addr) assert("localhost".equals(hostAddress.getHost)) addr = "www.baidu.com:22" hostAddress = NebulaUtils.getAddressFromString(addr) - assert(hostAddress.getPort==22) + assert(hostAddress.getPort == 22) addr = "[2023::2]:65535" hostAddress = NebulaUtils.getAddressFromString(addr) @@ -242,7 +244,7 @@ class NebulaUtilsSuite { addr = "2023::3" hostAddress = NebulaUtils.getAddressFromString(addr) assert(hostAddress.getHost.equals("2023::3")) - assert(hostAddress.getPort== -1) + assert(hostAddress.getPort == -1) // bad address addr = "localhost:65536" diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index ce57508b..1b51bfa7 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -239,7 +239,9 @@ class EdgeProcessor(spark: SparkSession, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") false } else true - idFlag && policyFlag + + val udfFlag = isVidStringType || policy.isEmpty || (edgeConfig.sourcePrefix == null && edgeConfig.targetPrefix == null) + idFlag && policyFlag && udfFlag } /** @@ -254,13 +256,15 @@ class EdgeProcessor(spark: SparkSession, "source_field", row, edgeConfig.sourcePolicy, - isVidStringType) + isVidStringType, + edgeConfig.sourcePrefix) val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, - isVidStringType) + isVidStringType, + edgeConfig.targetPrefix) val values = for { property <- fieldKeys if property.trim.length != 0 @@ -282,7 +286,8 @@ class EdgeProcessor(spark: SparkSession, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], - isVidStringType: Boolean): String = { + isVidStringType: Boolean, + prefix: String): String = { var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) { val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) @@ -292,6 +297,9 @@ class EdgeProcessor(spark: SparkSession, val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } + if (prefix != null) { + fieldValue = prefix + "_" + fieldValue + } // process string type vid if (policy.isEmpty && isVidStringType) { fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 7af987fc..3de3ff71 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -191,6 +191,9 @@ class VerticesProcessor(spark: SparkSession, printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row") return false } + if (!isVidStringType && (tagConfig.vertexPolicy.isEmpty && tagConfig.vertexPrefix != null)) { + printChoice(streamFlag, s"space vidType is int, does not support prefix for vid") + } val vertexId = row.get(index).toString.trim // process int type vid @@ -223,6 +226,9 @@ class VerticesProcessor(spark: SparkSession, if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } + if (tagConfig.vertexPrefix != null) { + vertexId = tagConfig.vertexPrefix + "_" + vertexId + } if (tagConfig.vertexPolicy.isEmpty && isVidStringType) { vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index 6274b4c1..e78cb6f4 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -84,9 +84,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", None, + null, None, "dst", None, + null, false, None, None, @@ -115,9 +117,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", Some(KeyPolicy.HASH), + null, None, "dst", Some(KeyPolicy.HASH), + null, false, None, None, @@ -145,6 +149,28 @@ class EdgeProcessorSuite { assert(edge.destination.equals("\"2\"")) assert(edge.toString.equals( "Edge: \"1\"->\"2\"@0 values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) + + val edgeConfigEntryWithPrefix = EdgeConfigEntry("friend", + null, + null, + fieldKeys, + nebulaKeys, + "src", + None, + "src", + None, + "dst", + None, + "dst", + false, + None, + None, + 10, + 10, + None) + val edgeWithPrefix = processClazz.convertToEdge(row, edgeConfigEntryWithPrefix, true, fieldKeys, map) + assert(edgeWithPrefix.source.equals("\"src_1\"")) + assert(edgeWithPrefix.destination.equals("\"dst_2\"")) } @Test diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 94c72dda..7fb90975 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -79,7 +79,7 @@ class VerticesProcessorSuite { val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -96,7 +96,17 @@ class VerticesProcessorSuite { // test for string id value with policy val tagConfigEntryWithPolicy = - TagConfigEntry("person", null, null, List(), List(), "id", Some(KeyPolicy.HASH), 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + "id", + Some(KeyPolicy.HASH), + null, + 10, + 10, + None) assert(!processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, true, true)) assertThrows[AssertionError]( processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, false, true)) @@ -107,6 +117,23 @@ class VerticesProcessorSuite { assert(processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, false, false)) assertThrows[AssertionError]( processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, false, true)) + + // test for prefix for id value + val tagConfigEntryWithPrefix = + TagConfigEntry("person", + null, + null, + List(), + List(), + "id", + Some(KeyPolicy.HASH), + "prefix", + 10, + 10, + None) + assert(processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPrefix, false, false)) + assertThrows[AssertionError]( + processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPrefix, false, true)) } @Test @@ -117,6 +144,12 @@ class VerticesProcessorSuite { assert(vertex.vertexID.equals("\"1\"")) assert(vertex.toString.equals( "Vertex ID: \"1\", Values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) + + val tagConfigEntryWithPrefix = + TagConfigEntry("person", null, null, List(), List(), "id", None, "prefix", 10, 10, None) + val vertexWithPrefix = + processClazz.convertToVertex(row, tagConfigEntryWithPrefix, true, fieldKeys, map) + assert(vertexWithPrefix.vertexID.equals("\"prefix_1\"")) } @Test diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 54372836..f2425602 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -258,7 +258,9 @@ class EdgeProcessor(spark: SparkSession, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") false } else true - idFlag && policyFlag + + val udfFlag = isVidStringType || policy.isEmpty || (edgeConfig.sourcePrefix == null && edgeConfig.targetPrefix == null) + idFlag && policyFlag && udfFlag } /** @@ -273,13 +275,15 @@ class EdgeProcessor(spark: SparkSession, "source_field", row, edgeConfig.sourcePolicy, - isVidStringType) + isVidStringType, + edgeConfig.sourcePrefix) val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, - isVidStringType) + isVidStringType, + edgeConfig.targetPrefix) val values = for { property <- fieldKeys if property.trim.length != 0 @@ -301,7 +305,8 @@ class EdgeProcessor(spark: SparkSession, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], - isVidStringType: Boolean): String = { + isVidStringType: Boolean, + prefix: String): String = { var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) { val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) @@ -311,6 +316,9 @@ class EdgeProcessor(spark: SparkSession, val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } + if (prefix != null) { + fieldValue = prefix + "_" + fieldValue + } // process string type vid if (policy.isEmpty && isVidStringType) { fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index f537aba4..507ceb08 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -205,6 +205,9 @@ class VerticesProcessor(spark: SparkSession, printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row") return false } + if (!isVidStringType && (tagConfig.vertexPolicy.isEmpty && tagConfig.vertexPrefix != null)) { + printChoice(streamFlag, s"space vidType is int, does not support prefix for vid") + } val vertexId = row.get(index).toString.trim // process int type vid @@ -237,6 +240,9 @@ class VerticesProcessor(spark: SparkSession, if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } + if (tagConfig.vertexPrefix != null) { + vertexId = tagConfig.vertexPrefix + "_" + vertexId + } if (tagConfig.vertexPolicy.isEmpty && isVidStringType) { vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index 96524f2c..c2c79b70 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -85,9 +85,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", None, + null, None, "dst", None, + null, false, None, None, @@ -116,9 +118,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", Some(KeyPolicy.HASH), + null, None, "dst", Some(KeyPolicy.HASH), + null, false, None, None, diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index f06eda66..9a8dac6e 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -77,7 +77,7 @@ class VerticesProcessorSuite { val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -94,7 +94,17 @@ class VerticesProcessorSuite { // test for string id value with policy val tagConfigEntryWithPolicy = - TagConfigEntry("person", null, null, List(), List(), "id", Some(KeyPolicy.HASH), 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + "id", + Some(KeyPolicy.HASH), + null, + 10, + 10, + None) assert(!processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, true, true)) assertThrows[AssertionError]( processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, false, true)) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index be0e3a2d..0b3c4629 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -257,7 +257,9 @@ class EdgeProcessor(spark: SparkSession, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row") false } else true - idFlag && policyFlag + + val udfFlag = isVidStringType || policy.isEmpty || (edgeConfig.sourcePrefix == null && edgeConfig.targetPrefix == null) + idFlag && policyFlag && udfFlag } /** @@ -272,13 +274,15 @@ class EdgeProcessor(spark: SparkSession, "source_field", row, edgeConfig.sourcePolicy, - isVidStringType) + isVidStringType, + edgeConfig.sourcePrefix) val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, - isVidStringType) + isVidStringType, + edgeConfig.targetPrefix) val values = for { property <- fieldKeys if property.trim.length != 0 @@ -300,7 +304,8 @@ class EdgeProcessor(spark: SparkSession, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], - isVidStringType: Boolean): String = { + isVidStringType: Boolean, + prefix: String): String = { var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) { val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get)) val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get)) @@ -310,6 +315,9 @@ class EdgeProcessor(spark: SparkSession, val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } + if (prefix != null) { + fieldValue = prefix + "_" + fieldValue + } // process string type vid if (policy.isEmpty && isVidStringType) { fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 35fa7d2c..ee9a0958 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -210,6 +210,9 @@ class VerticesProcessor(spark: SparkSession, printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row") return false } + if (!isVidStringType && (tagConfig.vertexPolicy.isEmpty && tagConfig.vertexPrefix != null)) { + printChoice(streamFlag, s"space vidType is int, does not support prefix for vid") + } val vertexId = row.get(index).toString // process int type vid @@ -242,6 +245,9 @@ class VerticesProcessor(spark: SparkSession, if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } + if (tagConfig.vertexPrefix != null) { + vertexId = tagConfig.vertexPrefix + "_" + vertexId + } if (tagConfig.vertexPolicy.isEmpty && isVidStringType) { vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"") diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index 6274b4c1..f4671529 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -84,9 +84,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", None, + null, None, "dst", None, + null, false, None, None, @@ -115,9 +117,11 @@ class EdgeProcessorSuite { nebulaKeys, "src", Some(KeyPolicy.HASH), + null, None, "dst", Some(KeyPolicy.HASH), + null, false, None, None, diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 2e184eac..ed918654 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -76,7 +76,7 @@ class VerticesProcessorSuite { val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -93,7 +93,17 @@ class VerticesProcessorSuite { // test for string id value with policy val tagConfigEntryWithPolicy = - TagConfigEntry("person", null, null, List(), List(), "id", Some(KeyPolicy.HASH), 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + "id", + Some(KeyPolicy.HASH), + null, + 10, + 10, + None) assert(!processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, true, true)) assertThrows[AssertionError]( processClazz.isVertexValid(stringIdRow, tagConfigEntryWithPolicy, false, true))