Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add vertex without tag for sst #63

Merged
merged 2 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -119,14 +119,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -222,7 +226,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -256,14 +260,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{Configs, TagConfigEntry}
import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem}
import org.apache.commons.codec.binary.Hex
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
BooleanType,
Expand All @@ -32,6 +33,8 @@ import org.scalatest.Assertions.assertThrows
import scala.collection.JavaConverters._

class VerticesProcessorSuite {
private[this] lazy val LOG = Logger.getLogger(this.getClass)

val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))

Expand Down Expand Up @@ -138,11 +141,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphenKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphenKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (key, value) = processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)
val (orphanKey, key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down