diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index b14fe98e..2bcd7160 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -119,6 +119,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -126,7 +127,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -222,7 +226,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -256,14 +260,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } } diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index b3594321..88d5afa4 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{Configs, TagConfigEntry} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex +import org.apache.log4j.Logger import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{ BooleanType, @@ -32,6 +33,8 @@ import org.scalatest.Assertions.assertThrows import scala.collection.JavaConverters._ class VerticesProcessorSuite { + private[this] lazy val LOG = Logger.getLogger(this.getClass) + val config: Configs = Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf")) @@ -138,11 +141,14 @@ class VerticesProcessorSuite { val tagItem = new TagItem(1, "person".getBytes(), -1, schema) val map = getFieldType() - val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) + val (orphanKey, key, value) = + processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) - val keyHex = Hex.encodeHexString(key) - val valueHex = Hex.encodeHexString(value) + val keyHex = Hex.encodeHexString(key) + val orphanKeyHex = Hex.encodeHexString(orphanKey) + val valueHex = Hex.encodeHexString(value) assert(keyHex.equals("010600003100000000000000000001000000")) + assert(orphanKeyHex.equals("0706000031000000000000000000")) } private def getRow(): Row = { diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 448a67fc..61a134b1 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -121,6 +121,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -128,7 +129,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } } diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 1cdbaffb..58825553 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -139,11 +139,14 @@ class VerticesProcessorSuite { val tagItem = new TagItem(1, "person".getBytes(), -1, schema) val map = getFieldType() - val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) + val (orphanKey, key, value) = + processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) - val keyHex = Hex.encodeHexString(key) - val valueHex = Hex.encodeHexString(value) + val keyHex = Hex.encodeHexString(key) + val orphenKeyHex = Hex.encodeHexString(orphanKey) + val valueHex = Hex.encodeHexString(value) assert(keyHex.equals("010600003100000000000000000001000000")) + assert(orphenKeyHex.equals("0706000031000000000000000000")) } private def getRow(): Row = { diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 8e22fcf0..2d262561 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -121,6 +121,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -128,7 +129,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } } diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index b3594321..548dc2f9 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -138,11 +138,14 @@ class VerticesProcessorSuite { val tagItem = new TagItem(1, "person".getBytes(), -1, schema) val map = getFieldType() - val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) + val (orphanKey, key, value) = + processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map) - val keyHex = Hex.encodeHexString(key) - val valueHex = Hex.encodeHexString(value) + val keyHex = Hex.encodeHexString(key) + val orphanKeyHex = Hex.encodeHexString(orphanKey) + val valueHex = Hex.encodeHexString(value) assert(keyHex.equals("010600003100000000000000000001000000")) + assert(orphanKeyHex.equals("0706000031000000000000000000")) } private def getRow(): Row = {