Skip to content

Commit

Permalink
add vertex without tag for sst (#63)
Browse files Browse the repository at this point in the history
* add vertex without tag for sst

* add test for orphanKey
  • Loading branch information
Nicole00 authored Jan 27, 2022
1 parent 1fc042b commit ad69495
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -119,14 +119,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -222,7 +226,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -256,14 +260,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{Configs, TagConfigEntry}
import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem}
import org.apache.commons.codec.binary.Hex
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
BooleanType,
Expand All @@ -32,6 +33,8 @@ import org.scalatest.Assertions.assertThrows
import scala.collection.JavaConverters._

class VerticesProcessorSuite {
private[this] lazy val LOG = Logger.getLogger(this.getClass)

val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))

Expand Down Expand Up @@ -138,11 +141,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphenKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphenKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down

0 comments on commit ad69495

Please sign in to comment.