diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/SchemaManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/SchemaManager.scala new file mode 100644 index 00000000..19ebd6ec --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/SchemaManager.scala @@ -0,0 +1,59 @@ +package org.apache.s2graph.core.schema + + +trait SchemaManager { + def findColumnService(columnId: Int): Service + + def findServiceColumn(columnId: Int): ServiceColumn + + def findServiceColumnMeta(columnId: Int, metaSeq: Byte): ColumnMeta + + def findLabel(labelId: Int): Label + + def findLabelService(labelId: Int): Service + + def findLabelIndex(labelId: Int, indexSeq: Byte): LabelIndex + + def findLabelIndexLabelMetas(labelId: Int, indexSeq: Byte): Array[LabelMeta] + + def findLabelMetas(labelId: Int): Map[Byte, LabelMeta] + + def findLabelMeta(labelId: Int, metaSeq: Byte): LabelMeta + + def checkLabelExist(labelId: Int): Boolean + + def checkServiceColumnExist(columnId: Int): Boolean + + def findServiceColumn(serviceName: String, columnName: String): ServiceColumn + + def findColumnMetas(serviceName: String, columnName: String): Map[String, ColumnMeta] + + def findColumnMetas(column: ServiceColumn): Map[String, ColumnMeta] + + def findLabel(labelName: String): Label + + def findLabelService(labelName: String): Service + + def findLabelIndices(label: Label): Map[String, LabelIndex] + + def findLabelIndices(labelName: String): Map[String, LabelIndex] + + def findLabelIndex(labelName: String, indexName: String): LabelIndex + + def findLabelIndexLabelMetas(labelName: String, indexName: String): Array[LabelMeta] + + def findLabelMetas(labelName: String): Map[String, LabelMeta] + + def findLabelMeta(labelName: String, name: String): LabelMeta + + def checkLabelExist(labelName: String): Boolean + + def findSrcServiceColumn(label: Label): ServiceColumn + + def findTgtServiceColumn(label: Label): ServiceColumn + + def findSrcServiceColumn(labelName: String): ServiceColumn + + def findTgtServiceColumn(labelName: String): ServiceColumn +} + diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala index 46b4860e..d51ee77c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala @@ -22,6 +22,7 @@ package org.apache.s2graph.core.storage.serde object Serializable { val vertexCf = "v".getBytes("UTF-8") val edgeCf = "e".getBytes("UTF-8") + val snapshotEdgeCf = "s".getBytes("UTF-8") } trait Serializable[E] extends StorageSerializable[E] diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala index 1ad56b36..5cf9f9fd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala @@ -29,6 +29,18 @@ import play.api.libs.json.Json object VertexId extends HBaseDeserializable { import HBaseType._ + def fromBytesRaw(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (InnerValLike, Int, Int) = { + var pos = offset + GraphUtil.bytesForMurMurHash + + val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) + pos += numOfBytesUsed + val colId = Bytes.toInt(bytes, pos, 4) + + (innerId, colId, GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4) + } def fromBytes(bytes: Array[Byte], offset: Int, diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index 3d3821e4..ed5a75e8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -167,13 +167,13 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { columnFamily: String = "e", batchSize: Int = 1000, labelMapping: Map[String, String] = Map.empty, - buildDegree: Boolean = false): RDD[Seq[Cell]] = { + buildDegree: Boolean = false): RDD[(ImmutableBytesWritable, Result)] = { val cf = Bytes.toBytes(columnFamily) val hbaseConfig = HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration) hbaseConfig.set("hbase.rootdir", snapshotPath) - val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]]) + val initial = ss.sparkContext.parallelize(Seq.empty[(ImmutableBytesWritable, Result)]) tableNames.foldLeft(initial) { case (prev, tableName) => val scan = new Scan scan.addFamily(cf) @@ -186,7 +186,8 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { val current = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], - classOf[Result]).map(_._2.listCells().asScala.toSeq) + classOf[Result]) +// .map(_._2.listCells().asScala.toSeq) prev ++ current } diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index dd6f41b7..cab97606 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -20,11 +20,15 @@ package org.apache.s2graph.s2jobs.task import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} -import org.apache.s2graph.core.GraphUtil +import org.apache.hadoop.hbase.KeyValue +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.{GraphUtil, Management} import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.wal.WalLog +import org.apache.s2graph.s2jobs.wal.utils.{SchemaUtil, SerializeUtil} import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL @@ -219,38 +223,70 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = { - val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf) - val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load()) + /* + * overwrite HBASE + MetaStorage + LocalCache configuration from given option. + */ + val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ + TaskConf.parseLocalCacheConfigs(conf) + val config = Management.toConfig(mergedConf) + + /* + * initialize meta storage connection. + * note that we only connect meta storage once at spark driver, + * then build schema manager which is serializable and broadcast it to executors. + * + * schema manager responsible for translation between logical logical representation and physical representation. + * + */ + SchemaUtil.init(config) + val ss = df.sparkSession + val sc = ss.sparkContext // required for bulkload - val labelMapping = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty) - val buildDegree = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false) - val autoEdgeCreate = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false) - val skipError = getConfigStringOpt(graphConfig, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false) + val labelMapping = getConfigStringOpt(config, S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty) + val buildDegree = getConfigStringOpt(config, S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false) + val autoEdgeCreate = getConfigStringOpt(config, S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false) + val skipError = getConfigStringOpt(config, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false) - val zkQuorum = graphConfig.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM) - val tableName = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME) + val zkQuorum = config.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM) + val tableName = config.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME) - val numRegions = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt - val outputPath = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR) + val numRegions = config.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt + val outputPath = config.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR) // optional. - val incrementalLoad = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false) - val compressionAlgorithm = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4") + val incrementalLoad = getConfigStringOpt(config, S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false) + val compressionAlgorithm = getConfigStringOpt(config, S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4") val hbaseConfig = HFileGenerator.toHBaseConfig(zkQuorum, tableName) - val input = df.rdd + val schema = SchemaUtil.buildSchemaManager(Map.empty, Nil) + val schemaBCast = sc.broadcast(schema) + val tallSchemaVersions = Set(HBaseType.VERSION4) + + implicit val enc = Encoders.kryo[KeyValue] - val transformer = new SparkBulkLoaderTransformer(graphConfig, labelMapping, buildDegree) + val kvs = df.mapPartitions { iter => + val schema = schemaBCast.value - implicit val reader = new RowBulkFormatReader - implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError) + iter.flatMap { row => + val walLog = WalLog.fromRow(row) + + SerializeUtil.walToSKeyValues(walLog, schema, tallSchemaVersions) + .map(SerializeUtil.sKeyValueToKeyValue) + } + } - val kvs = transformer.transform(input) +// val input = df +// val transformer = new SparkBulkLoaderTransformer(graphConfig, labelMapping, buildDegree) +// +// implicit val reader = new RowBulkFormatReader +// implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError) +// +// val kvs = transformer.transform(input).flatMap(ls => ls) - HFileGenerator.generateHFile(df.sparkSession.sparkContext, graphConfig, - kvs.flatMap(ls => ls), hbaseConfig, tableName, + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, + kvs.rdd, hbaseConfig, tableName, numRegions, outputPath, incrementalLoad, compressionAlgorithm ) diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 8e4e234c..45bedb0d 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -19,26 +19,26 @@ package org.apache.s2graph.s2jobs.task +import org.apache.s2graph.core.types.HBaseType import org.apache.s2graph.core.{JSONParser, Management} import org.apache.s2graph.s2jobs.Schema -import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} -import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader -import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter +import org.apache.s2graph.s2jobs.loader.HFileGenerator +import org.apache.s2graph.s2jobs.wal.utils.{DeserializeUtil, SchemaUtil} import org.apache.spark.sql.{DataFrame, SparkSession} import play.api.libs.json.{JsObject, Json} - /** * Source * * @param conf */ -abstract class Source(override val conf:TaskConf) extends Task { - def toDF(ss:SparkSession):DataFrame +abstract class Source(override val conf: TaskConf) extends Task { + def toDF(ss: SparkSession): DataFrame } -class KafkaSource(conf:TaskConf) extends Source(conf) { +class KafkaSource(conf: TaskConf) extends Source(conf) { val DEFAULT_FORMAT = "raw" + override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe") def repartition(df: DataFrame, defaultParallelism: Int) = { @@ -51,7 +51,7 @@ class KafkaSource(conf:TaskConf) extends Source(conf) { } } - override def toDF(ss:SparkSession):DataFrame = { + override def toDF(ss: SparkSession): DataFrame = { logger.info(s"${LOG_PREFIX} options: ${conf.options}") val format = conf.options.getOrElse("format", "raw") @@ -61,22 +61,22 @@ class KafkaSource(conf:TaskConf) extends Source(conf) { format match { case "raw" => partitionedDF case "json" => parseJsonSchema(ss, partitionedDF) -// case "custom" => parseCustomSchema(df) + // case "custom" => parseCustomSchema(df) case _ => logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ") partitionedDF } } - def parseJsonSchema(ss:SparkSession, df:DataFrame):DataFrame = { + def parseJsonSchema(ss: SparkSession, df: DataFrame): DataFrame = { import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types.DataType import ss.implicits._ val schemaOpt = conf.options.get("schema") schemaOpt match { - case Some(schemaAsJson:String) => - val dataType:DataType = DataType.fromJson(schemaAsJson) + case Some(schemaAsJson: String) => + val dataType: DataType = DataType.fromJson(schemaAsJson) logger.debug(s"${LOG_PREFIX} schema : ${dataType.sql}") df.selectExpr("CAST(value AS STRING)") @@ -90,8 +90,9 @@ class KafkaSource(conf:TaskConf) extends Source(conf) { } } -class FileSource(conf:TaskConf) extends Source(conf) { +class FileSource(conf: TaskConf) extends Source(conf) { val DEFAULT_FORMAT = "parquet" + override def mandatoryOptions: Set[String] = Set("paths") override def toDF(ss: SparkSession): DataFrame = { @@ -119,7 +120,8 @@ class FileSource(conf:TaskConf) extends Source(conf) { } } -class HiveSource(conf:TaskConf) extends Source(conf) { + +class HiveSource(conf: TaskConf) extends Source(conf) { override def mandatoryOptions: Set[String] = Set("database", "table") override def toDF(ss: SparkSession): DataFrame = { @@ -132,40 +134,72 @@ class HiveSource(conf:TaskConf) extends Source(conf) { } class S2GraphSource(conf: TaskConf) extends Source(conf) { + import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._ + override def mandatoryOptions: Set[String] = Set( S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR, S2_SOURCE_BULKLOAD_RESTORE_PATH, - S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES + S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES, + S2_SOURCE_BULKLOAD_LABEL_NAMES ) override def toDF(ss: SparkSession): DataFrame = { + /* + * overwrite HBASE + MetaStorage + LocalCache configuration from given option. + */ val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ TaskConf.parseLocalCacheConfigs(conf) val config = Management.toConfig(mergedConf) + /* + * initialize meta storage connection. + * note that we only connect meta storage once at spark driver, + * then build schema manager which is serializable and broadcast it to executors. + * + * schema manager responsible for translation between logical logical representation and physical representation. + * + */ + SchemaUtil.init(config) + + val sc = ss.sparkContext + val snapshotPath = conf.options(S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR) val restorePath = conf.options(S2_SOURCE_BULKLOAD_RESTORE_PATH) val tableNames = conf.options(S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES).split(",") val columnFamily = conf.options.getOrElse(S2_SOURCE_BULKLOAD_HBASE_TABLE_CF, "e") val batchSize = conf.options.getOrElse(S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE, "1000").toInt + val labelNames = conf.options(S2_SOURCE_BULKLOAD_LABEL_NAMES).split(",").toSeq val labelMapping = Map.empty[String, String] val buildDegree = if (columnFamily == "v") false else conf.options.getOrElse(S2_SOURCE_BULKLOAD_BUILD_DEGREE, "false").toBoolean - val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge") - val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema + val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge").toLowerCase + + val resultSchema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath, restorePath, tableNames, columnFamily, batchSize, labelMapping, buildDegree) - implicit val reader = new S2GraphCellReader(elementType) - implicit val writer = new RowDataFrameWriter - - val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) - val kvs = transformer.transform(cells) + val schema = SchemaUtil.buildEdgeDeserializeSchema(labelNames) + val schemaBCast = sc.broadcast(schema) + val tallSchemaVersions = Set(HBaseType.VERSION4) + val tgtDirection = 0 + + val results = cells.mapPartitions { iter => + val schema = schemaBCast.value + + iter.flatMap { case (_, result) => + elementType.toLowerCase match { + case "indexedge" => DeserializeUtil.indexEdgeResultToWals(result, schema, tallSchemaVersions, tgtDirection).map(DeserializeUtil.walLogToRow) + case "snapshotedge" => DeserializeUtil.snapshotEdgeResultToWals(result, schema, tallSchemaVersions).map(DeserializeUtil.walLogToRow) + case "vertex" => DeserializeUtil.vertexResultToWals(result, schema).map(DeserializeUtil.walVertexToRow) + case _ => throw new IllegalArgumentException(s"$elementType is not supported.") + } + } + } - ss.sqlContext.createDataFrame(kvs, schema) + ss.createDataFrame(results, resultSchema) } } \ No newline at end of file diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/MemorySchemaManager.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/MemorySchemaManager.scala new file mode 100644 index 00000000..b9d15961 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/MemorySchemaManager.scala @@ -0,0 +1,256 @@ +package org.apache.s2graph.s2jobs.wal + +import org.apache.s2graph.core.schema._ + +class MemorySchemaManager(serviceLs: Seq[Service], + serviceColumnLs: Seq[ServiceColumn], + columnMetaLs: Seq[ColumnMeta], + labelLs: Seq[Label], + labelIndexLs: Seq[LabelIndex], + labelMetaLs: Seq[LabelMeta]) extends SchemaManager { + private val services = serviceLs.map { service => service.serviceName -> service }.toMap + private val servicesInverted = serviceLs.map { service => service.id.get -> service }.toMap + + private val columnServiceMap: Map[ServiceColumn, Service] = serviceColumnLs.flatMap { column => + servicesInverted.get(column.serviceId).map(column -> _) + }.toMap + +// private val columnService = columnServiceMap.map(t => t._1.columnName -> t._2) + private val columnServiceInverted = columnServiceMap.map(t => t._1.id.get -> t._2) + +// private val columns = columnServiceMap.map(t => t._1.columnName -> t._1) + private val columnsInverted = columnServiceMap.map(t => t._1.id.get -> t._1) + + private val columnMetasMap: Map[ServiceColumn, Seq[ColumnMeta]] = + columnMetaLs + .groupBy(_.columnId) + .flatMap { case (columnId, metas) => + columnsInverted.get(columnId).map(_ -> (metas ++ ColumnMeta.reservedMetas)) + } + + private val columnMetas = columnMetasMap.map { case (column, metas) => + val innerMap = metas.map { meta => + meta.name -> meta + }.toMap + + column -> innerMap + } + + private val columnMetasInverted = columnMetasMap.map { case (column, metas) => + val innerMap = metas.map { meta => + meta.seq -> meta + }.toMap + + column.id.get -> innerMap + } + + private val labelsMap = labelLs.flatMap { label => + services.get(label.serviceName).map(label -> _) + }.toMap + + private val labelService = labelsMap.map(t => t._1.label -> t._2) + private val labelServiceInverted = labelsMap.map(t => t._1.id.get -> t._2) + + private val labels = labelsMap.map(t => t._1.label -> t._1) + private val labelsInverted = labelsMap.map(t => t._1.id.get -> t._1) + + private val labelMetasMap: Map[Label, Seq[LabelMeta]] = + labelMetaLs + .groupBy(_.labelId) + .flatMap { case (labelId, metas) => + labelsInverted.get(labelId).map(_ -> (metas ++ LabelMeta.reservedMetas)) + } + + private val labelMetas = labelMetasMap.map { case (label, metas) => + val innerMap = metas.map { meta => + meta.name -> meta + }.toMap + + label.label -> innerMap + } + private val labelMetasInverted = labelMetasMap.map { case (label, metas) => + val innerMap = metas.map { meta => + meta.seq -> meta + }.toMap + + label.id.get -> innerMap + } + + private val labelIndicesMap = labelIndexLs + .groupBy(_.labelId) + .flatMap { case (labelId, indices) => + labelsInverted.get(labelId).map(_ -> indices) + } + + private val labelIndices = labelIndicesMap.map { case (label, indices) => + val innerMap = indices.map { index => + index.name -> index + }.toMap + + label.label -> innerMap + } + + private val labelIndicesInverted = labelIndicesMap.map { case (label, indices) => + val innerMap = indices.map { index => + index.seq -> index + }.toMap + + label.id.get -> innerMap + } + + private val labelIndexLabelMetas = labelIndicesMap.map { case (label, indices) => + val innerMap = indices.map { index => + val metas = index.metaSeqs.map { seq => + labelMetasInverted(label.id.get)(seq) + }.toArray + + index.name -> metas + }.toMap + + label.label -> innerMap + } + + private val labelIndexLabelMetasInverted = labelIndicesMap.map { case (label, indices) => + val innerMap = indices.map { index => + val metas = index.metaSeqs.map { seq => + labelMetasInverted(label.id.get)(seq) + }.toArray + + index.seq -> metas + }.toMap + + label.id.get -> innerMap + } + + private val serviceIdColumnNameColumn = columnServiceMap.map { case (column, service) => + val key = service.id.get -> column.columnName + key -> column + } + + private val serviceNameColumnNameColumn = columnServiceMap.map { case (column, service) => + val key = service.serviceName -> column.columnName + key -> column + } + + def findColumnService(columnId: Int): Service = { + columnServiceInverted(columnId) + } + + def findServiceColumn(columnId: Int): ServiceColumn = { + columnsInverted(columnId) + } + + def findServiceColumnMeta(columnId: Int, metaSeq: Byte): ColumnMeta = { + columnMetasInverted(columnId)(metaSeq) + } + + def findLabel(labelId: Int): Label = { + labelsInverted(labelId) + } + + def findLabelService(labelId: Int): Service = { + labelServiceInverted(labelId) + } + + def findLabelIndex(labelId: Int, indexSeq: Byte): LabelIndex = { + labelIndicesInverted(labelId)(indexSeq) + } + + def findLabelIndexLabelMetas(labelId: Int, indexSeq: Byte): Array[LabelMeta] = { + labelIndexLabelMetasInverted(labelId)(indexSeq) + } + + def findLabelMetas(labelId: Int): Map[Byte, LabelMeta] = { + labelMetasInverted(labelId) + } + + def findLabelMeta(labelId: Int, metaSeq: Byte): LabelMeta = { + labelMetasInverted(labelId)(metaSeq) + } + + def checkLabelExist(labelId: Int): Boolean = { + labelsInverted.contains(labelId) + } + + def checkServiceColumnExist(columnId: Int): Boolean = { + columnsInverted.contains(columnId) + } + + def findServiceColumn(serviceName: String, columnName: String): ServiceColumn = { + serviceNameColumnNameColumn(serviceName -> columnName) + } + + def findColumnMetas(serviceName: String, columnName: String): Map[String, ColumnMeta] = { + val column = findServiceColumn(serviceName, columnName) + findColumnMetas(column) + } + + def findColumnMetas(column: ServiceColumn): Map[String, ColumnMeta] = { + columnMetas(column) + } + + def findLabel(labelName: String): Label = { + labels(labelName) + } + + def findLabelService(labelName: String): Service = { + labelService(labelName) + } + + def findLabelIndices(label: Label): Map[String, LabelIndex] = { + findLabelIndices(label.label) + } + + def findLabelIndices(labelName: String): Map[String, LabelIndex] = { + labelIndices(labelName) + } + + def findLabelIndex(labelName: String, indexName: String): LabelIndex = { + labelIndices(labelName)(indexName) + } + + def findLabelIndexLabelMetas(labelName: String, indexName: String): Array[LabelMeta] = { + labelIndexLabelMetas(labelName)(indexName) + } + + def findLabelMetas(labelName: String): Map[String, LabelMeta] = { + labelMetas(labelName) + } + + def findLabelMeta(labelName: String, name: String): LabelMeta = { + labelMetas(labelName)(name) + } + + def checkLabelExist(labelName: String): Boolean = { + labels.contains(labelName) + } + + def findSrcServiceColumn(label: Label): ServiceColumn = { + serviceIdColumnNameColumn(label.srcServiceId -> label.srcColumnName) + } + + def findTgtServiceColumn(label: Label): ServiceColumn = { + serviceIdColumnNameColumn(label.tgtServiceId -> label.tgtColumnName) + } + + def findSrcServiceColumn(labelName: String): ServiceColumn = { + val label = findLabel(labelName) + serviceIdColumnNameColumn(label.srcServiceId -> label.srcColumnName) + } + + def findTgtServiceColumn(labelName: String): ServiceColumn = { + val label = findLabel(labelName) + serviceIdColumnNameColumn(label.tgtServiceId -> label.tgtColumnName) + } +} + +object MemorySchemaManager { + def apply(serviceLs: Seq[Service], + serviceColumnLs: Seq[ServiceColumn], + columnMetaLs: Seq[ColumnMeta], + labelLs: Seq[Label], + labelIndexLs: Seq[LabelIndex], + labelMetaLs: Seq[LabelMeta]): SchemaManager = { + new MemorySchemaManager(serviceLs, serviceColumnLs, columnMetaLs, labelLs, labelIndexLs, labelMetaLs) + } +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index ad696a9c..12641c27 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -1,7 +1,7 @@ package org.apache.s2graph.s2jobs.wal import com.google.common.hash.Hashing -import org.apache.s2graph.core.{GraphUtil, JSONParser} +import org.apache.s2graph.core.JSONParser import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam import org.apache.s2graph.s2jobs.wal.transformer.Transformer import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue @@ -104,8 +104,8 @@ object WalLogAgg { private def filterPropsInner(walLogs: Seq[WalLog], - transformers: Seq[Transformer], - validFeatureHashKeys: Set[Long]): Seq[WalLog] = { + transformers: Seq[Transformer], + validFeatureHashKeys: Set[Long]): Seq[WalLog] = { walLogs.map { walLog => val fields = walLog.propsJson.fields.filter { case (propKey, propValue) => val filtered = transformers.flatMap { transformer => @@ -119,8 +119,8 @@ object WalLogAgg { } def filterProps(walLogAgg: WalLogAgg, - transformers: Seq[Transformer], - validFeatureHashKeys: Set[Long]) = { + transformers: Seq[Transformer], + validFeatureHashKeys: Set[Long]) = { val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys) val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys) @@ -176,6 +176,28 @@ case class WalLog(timestamp: Long, } } +object WalVertex { + def fromWalLog(walLog: WalLog): WalVertex = { + WalVertex( + walLog.timestamp, + walLog.operation, + walLog.elem, + walLog.from, + walLog.service, + walLog.label, + walLog.props + ) + } +} + +case class WalVertex(timestamp: Long, + operation: String, + elem: String, + id: String, + service: String, + column: String, + props: String) + object WalLog { val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp) @@ -203,6 +225,4 @@ object WalLog { WalLog(timestamp, operation, elem, from, to, service, label, props) } - - } diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/HFileParserUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/HFileParserUDF.scala new file mode 100644 index 00000000..7a73f191 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/HFileParserUDF.scala @@ -0,0 +1,83 @@ +//package org.apache.s2graph.s2jobs.wal.udfs +// +//import java.nio.ByteBuffer +// +//import com.typesafe.config.ConfigFactory +//import org.apache.hadoop.hbase.client.Result +//import org.apache.s2graph.core.Management +//import org.apache.s2graph.core.types.HBaseType +//import org.apache.s2graph.s2jobs.udfs.Udf +//import org.apache.s2graph.s2jobs.wal.LabelSchema +//import org.apache.s2graph.s2jobs.wal.utils.{DeserializeUtil, SchemaUtil} +//import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} +//import org.apache.spark.sql.{Row, SparkSession} +//import org.apache.spark.sql.functions.udf +// +// +//object HFileParserUDF { +// +// import org.apache.s2graph.s2jobs.wal.utils.DeserializeUtil._ +// import org.apache.s2graph.core.storage.SKeyValue +// +//// def resultParser(serDe: SerializerInstance, +//// labelSchema: LabelSchema, +//// tallSchemaVersion: Set[String]) = { +//// udf((row: Array[Byte], resultBytes: Array[Byte]) => { +//// val result = serDe.deserialize[Result](ByteBuffer.wrap(resultBytes)) +//// DeserializeUtil.resultToWals(row, result, labelSchema, tallSchemaVersion) +//// }) +//// } +// +//// def walParser(labelSchema: LabelSchema, +//// tallSchemaVersions: Set[String]) = { +//// udf((row: Array[Byte], kvsRow: Seq[Row]) => { +//// DeserializeUtil.indexEdgeResultToWalsV3IterTest(row, kvsRow, labelSchema, tallSchemaVersions) +//// }) +//// } +// +//// def indexEdgeParser(labelSchema: LabelSchema, +//// tallSchemaVersions: Set[String]) = { +//// udf((row: Row) => { +//// val skv = sKeyValueFromRow(row) +//// +//// indexEdgeKeyValueToRow(skv, None, labelSchema, tallSchemaVersions) +//// }) +//// } +//// +//// def snapshotEdgeParser(labelSchema: LabelSchema) = { +//// udf((row: Row) => { +//// val skv = sKeyValueFromRow(row) +//// snapshotEdgeKeyValueToRow(skv, None, labelSchema) +//// }) +//// } +//} +// +//class HFileParserUDF extends Udf { +// +// import HFileParserUDF._ +// import org.apache.s2graph.s2jobs.wal.utils.TaskConfUtil._ +// +// override def register(ss: SparkSession, name: String, options: Map[String, String]): Unit = { +// val mergedConfig = Management.toConfig(parseMetaStoreConfigs(options)) +// val config = ConfigFactory.load(mergedConfig) +// SchemaUtil.init(config) +// +//// val elementType = options.getOrElse("elementType", "indexedge") +// val labelNames = options.get("labelNames").map(_.split(",").toSeq).getOrElse(Nil) +// +// val labelSchema = SchemaUtil.buildLabelSchema(labelNames) +// +// val tallSchemaVersions = Set(HBaseType.VERSION4) +// +//// val f = elementType.toLowerCase match { +//// case "indexedge" => indexEdgeParser(labelSchema, tallSchemaVersions) +//// case "snapshotedge" => snapshotEdgeParser(labelSchema) +//// case _ => throw new IllegalArgumentException(s"$elementType is not supported.") +//// } +// val f = walParser(labelSchema, tallSchemaVersions) +//// val serDe = new KryoSerializer(ss.sparkContext.getConf).newInstance() +//// val f = resultParser(serDe, labelSchema, tallSchemaVersions) +// +// ss.udf.register(name, f) +// } +//} \ No newline at end of file diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/IsEmptyByteArrayUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/IsEmptyByteArrayUDF.scala new file mode 100644 index 00000000..3c426eaf --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/IsEmptyByteArrayUDF.scala @@ -0,0 +1,16 @@ +package org.apache.s2graph.s2jobs.wal.udfs + +import org.apache.s2graph.s2jobs.udfs.Udf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.udf + + +class IsEmptyByteArrayUDF extends Udf { + override def register(ss: SparkSession, name: String, options: Map[String, String]): Unit = { + val f = udf((bytes: Array[Byte]) => { + bytes.isEmpty + }) + + ss.udf.register(name, f) + } +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtil.scala new file mode 100644 index 00000000..0e03080a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtil.scala @@ -0,0 +1,457 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{Cell, CellUtil} +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.storage.serde.StorageDeserializable +import org.apache.s2graph.core.storage.serde.StorageDeserializable.bytesToInt +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.{GraphUtil, JSONParser, S2Vertex} +import org.apache.s2graph.s2jobs.wal._ +import org.apache.spark.sql.Row +import play.api.libs.json.Json + +import scala.util.{Failure, Success, Try} + +object DeserializeUtil { + + private def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + private def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { + val byte = bytes(offset) + val isInverted = if ((byte & 1) != 0) true else false + val labelOrderSeq = byte >> 1 + (labelOrderSeq.toByte, isInverted) + } + + private def bytesToKeyValues(bytes: Array[Byte], + offset: Int, + length: Int, + schemaVer: String, + labelMetaMap: Map[Byte, LabelMeta]): (Array[(LabelMeta, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(LabelMeta, InnerValLike)](len) + var i = 0 + while (i < len) { + val k = labelMetaMap(bytes(pos)) + pos += 1 + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + private def bytesToKeyValuesWithTs(bytes: Array[Byte], + offset: Int, + schemaVer: String, + labelMetaMap: Map[Byte, LabelMeta]): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len) + var i = 0 + while (i < len) { + val k = labelMetaMap(bytes(pos)) + pos += 1 + val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def sKeyValueFromRow(row: Row): SKeyValue = { + val table = row.getAs[Array[Byte]]("table") + val _row = row.getAs[Array[Byte]]("row") + val cf = row.getAs[Array[Byte]]("cf") + val qualifier = row.getAs[Array[Byte]]("qualifier") + val value = row.getAs[Array[Byte]]("value") + val timestamp = row.getAs[Long]("timestamp") + val operation = row.getAs[Int]("operation") + val durability = row.getAs[Boolean]("durability") + + SKeyValue(table, _row, cf, qualifier, value, timestamp, operation, durability) + } + + def cellToSKeyValue(cell: Cell): SKeyValue = { + new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, cell.getQualifier, + cell.getValue, cell.getTimestamp, SKeyValue.Default) + } + + def sKeyValueToCell(skv: SKeyValue): Cell = { + CellUtil.createCell(skv.row, skv.cf, skv.qualifier, skv.timestamp, 4.toByte, skv.value) + } + + case class RowV3Parsed(srcVertexId: VertexId, + labelWithDir: LabelWithDirection, + labelIdxSeq: Byte, + isInverted: Boolean, + pos: Int) + + case class QualifierV3Parsed(idxPropsArray: IndexedSeq[(LabelMeta, InnerValLike)], + tgtVertexId: VertexId, + op: Byte) + + case class SnapshotRowV3Parsed(srcVertexId: VertexId, + tgtVertexId: VertexId, + labelWithDir: LabelWithDirection, + labelIdxSeq: Byte, + isInverted: Boolean) + + def toRowV3Parsed(row: Array[Byte]): RowV3Parsed = { + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(row, pos, row.length, HBaseType.DEFAULT_VERSION) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(row, pos) + pos += 1 + + RowV3Parsed(srcVertexId, labelWithDir, labelIdxSeq, isInverted, pos) + } + + def toQualifierV4Parsed(row: Array[Byte], + offset: Int, + schemaVer: String, + labelId: Int, + labelIdxSeq: Byte, + schema: SchemaManager): Try[QualifierV3Parsed] = Try { + val (idxPropsRaw, endAt) = StorageDeserializable.bytesToProps(row, offset, schemaVer) + val pos = endAt + + val (tgtVertexIdRaw, tgtVertexIdLen) = + if (endAt == row.length - 1) (HBaseType.defaultTgtVertexId, 0) + else TargetVertexId.fromBytes(row, endAt, row.length - 1, schemaVer) + + val op = row.last + + val idxPropsArray = toIndexProps(idxPropsRaw, labelId, labelIdxSeq, schema) + + QualifierV3Parsed(idxPropsArray, tgtVertexIdRaw, op) + } + + def toQualifierV3Parsed(qualifier: Array[Byte], + schemaVer: String, + labelId: Int, + labelIdxSeq: Byte, + schema: SchemaManager): Try[QualifierV3Parsed] = Try { + val (idxPropsRaw, endAt) = + StorageDeserializable.bytesToProps(qualifier, 0, schemaVer) + + var pos = endAt + + val (tgtVertexIdRaw, tgtVertexIdLen) = + if (endAt == qualifier.length) (HBaseType.defaultTgtVertexId, 0) + else TargetVertexId.fromBytes(qualifier, endAt, qualifier.length, schemaVer) + + pos += tgtVertexIdLen + + val op = + if (qualifier.length == pos) GraphUtil.defaultOpByte + else qualifier.last + + val idxPropsArray = toIndexProps(idxPropsRaw, labelId, labelIdxSeq, schema) + + QualifierV3Parsed(idxPropsArray, tgtVertexIdRaw, op) + } + + def toIndexProps(idxPropsRaw: Array[(LabelMeta, InnerValLike)], + labelId: Int, + labelIdxSeq: Byte, + schema: SchemaManager): IndexedSeq[(LabelMeta, InnerValLike)] = { + val sortKeyTypesArray = schema.findLabelIndexLabelMetas(labelId, labelIdxSeq) + + val size = idxPropsRaw.length + (0 until size).map { ith => + val meta = sortKeyTypesArray(ith) + val (k, v) = idxPropsRaw(ith) + meta -> v + } + } + + def toMetaProps(value: Array[Byte], + labelId: Int, + op: Byte, + schemaVer: String, + schema: SchemaManager) = { + /* process props */ + + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = StorageDeserializable.bytesToLong(value, 0) + Array(LabelMeta.count -> InnerVal.withLong(countVal, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(value, 0, value.length, schemaVer, schema.findLabelMetas(labelId)) + props + } + } + + def toSnapshotRowV3Parsed(row: Array[Byte], + qualifier: Array[Byte]): Try[SnapshotRowV3Parsed] = Try { + val labelWithDirByteLen = 4 + val labelIndexSeqWithIsInvertedByteLen = 1 + + val (srcVertexId, srcIdLen) = + SourceVertexId.fromBytes(row, 0, row.length, HBaseType.DEFAULT_VERSION) + + val isTallSchema = + (srcIdLen + labelWithDirByteLen + labelIndexSeqWithIsInvertedByteLen) != row.length + + val (tgtVertexId, pos) = + if (isTallSchema) { + val (tgtId, tgtBytesLen) = InnerVal.fromBytes(row, srcIdLen, row.length, HBaseType.DEFAULT_VERSION) + + (TargetVertexId(ServiceColumn.Default, tgtId), srcIdLen + tgtBytesLen) + } else { + val (tgtVertexId, _) = TargetVertexId.fromBytes(qualifier, 0, qualifier.length, HBaseType.DEFAULT_VERSION) + + (tgtVertexId, srcIdLen) + } + + val labelWithDir = LabelWithDirection(Bytes.toInt(row, pos, labelWithDirByteLen)) + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(row, pos + labelWithDirByteLen) + + SnapshotRowV3Parsed(srcVertexId, tgtVertexId, labelWithDir, labelIdxSeq, isInverted) + } + + def deserializeIndexEdgeCell(cell: Cell, + row: Array[Byte], + rowV3Parsed: RowV3Parsed, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Option[WalLog] = { + val labelWithDir = rowV3Parsed.labelWithDir + val labelId = labelWithDir.labelId + val labelIdxSeq = rowV3Parsed.labelIdxSeq + val srcVertexId = rowV3Parsed.srcVertexId + + val label = schema.findLabel(labelId) + val schemaVer = label.schemaVersion + val isTallSchema = tallSchemaVersions(label.schemaVersion) + + val qualifier = cell.getQualifier + + val isDegree = if (isTallSchema) rowV3Parsed.pos == row.length else qualifier.isEmpty + val validV3Qualifier = !qualifier.isEmpty + + if (isDegree) None + else if (!isTallSchema && !validV3Qualifier) None + else { + val qualifierParsedTry = + if (isTallSchema) toQualifierV4Parsed(row, rowV3Parsed.pos, schemaVer, labelId, labelIdxSeq, schema) + else toQualifierV3Parsed(qualifier, schemaVer, labelId, labelIdxSeq, schema) + + qualifierParsedTry.toOption.map { qualifierParsed => + val tgtVertexId = qualifierParsed.tgtVertexId + val idxPropsArray = qualifierParsed.idxPropsArray + val op = qualifierParsed.op + + val value = cell.getValue + + val metaPropsArray = toMetaProps(value, labelId, op, schemaVer, schema) + + val mergedProps = (idxPropsArray ++ metaPropsArray).toMap + val tsInnerVal = mergedProps(LabelMeta.timestamp) + val propsJson = for { + (labelMeta, innerValLike) <- mergedProps + jsValue <- JSONParser.innerValToJsValue(innerValLike, labelMeta.dataType) + } yield { + labelMeta.name -> jsValue + } + + val tgtVertexIdInnerId = mergedProps.getOrElse(LabelMeta.to, tgtVertexId.innerId) + + WalLog( + tsInnerVal.toIdString().toLong, + GraphUtil.fromOp(op), + "edge", + srcVertexId.innerId.toIdString(), + tgtVertexIdInnerId.toIdString(), + schema.findLabelService(labelWithDir.labelId).serviceName, + label.label, + Json.toJson(propsJson).toString() + ) + } + } + } + + def indexEdgeResultToWals(result: Result, + schema: SchemaManager, + tallSchemaVersions: Set[String], + tgtDirection: Int = 0): Seq[WalLog] = { + val rawCells = result.rawCells() + + if (rawCells.isEmpty) Nil + else { + val head = rawCells.head + val row = head.getRow + val rowV3Parsed = toRowV3Parsed(row) + + val labelWithDir = rowV3Parsed.labelWithDir + val labelId = labelWithDir.labelId + + val inValidRow = rowV3Parsed.isInverted || !schema.checkLabelExist(labelId) || labelWithDir.dir != tgtDirection + + if (inValidRow) Nil + else { + rawCells.flatMap { cell => + deserializeIndexEdgeCell(cell, row, rowV3Parsed, schema, tallSchemaVersions) + } + } + } + } + + + def snapshotEdgeResultToWals(result: Result, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Seq[WalLog] = { + val rawCells = result.rawCells() + + if (rawCells.isEmpty) Nil + else { + val head = rawCells.head + val row = head.getRow + val qualifier = head.getQualifier + + toSnapshotRowV3Parsed(row, qualifier) match { + case Success(v) => + val SnapshotRowV3Parsed(srcVertexId, tgtVertexId, labelWithDir, _, isInverted) = v + + if (!isInverted) Nil + else { + val label = schema.findLabel(labelWithDir.labelId) + val schemaVer = label.schemaVersion + + rawCells.map { cell => + val value = cell.getValue + val (_, op) = statusCodeWithOp(value.head) + + val (props, _) = bytesToKeyValuesWithTs(value, 1, schemaVer, schema.findLabelMetas(labelWithDir.labelId)) + val kvsMap = props.toMap + val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal + + val propsJson = for { + (labelMeta, innerValLikeWithTs) <- props + jsValue <- JSONParser.innerValToJsValue(innerValLikeWithTs.innerVal, labelMeta.dataType) + } yield { + labelMeta.name -> jsValue + } + + WalLog( + tsInnerVal.toIdString().toLong, + GraphUtil.fromOp(op), + "edge", + srcVertexId.innerId.toIdString(), + tgtVertexId.innerId.toIdString(), + schema.findLabelService(labelWithDir.labelId).serviceName, + label.label, + Json.toJson(propsJson.toMap).toString() + ) + } + } + + case Failure(ex) => Nil + } + } + } + + def walLogToRow(walLog: WalLog): Row = { + Row.fromSeq( + Seq(walLog.timestamp, walLog.operation, walLog.elem, walLog.from, walLog.to, walLog.service, walLog.label, walLog.props) + ) + } + + def walVertexToRow(walVertex: WalVertex): Row = { + Row.fromSeq( + Seq(walVertex.timestamp, walVertex.operation, walVertex.elem, walVertex.id, walVertex.service, walVertex.column, walVertex.props) + ) + } + + def vertexResultToWals(result: Result, + schema: SchemaManager, + bytesToInt: (Array[Byte], Int) => Int = bytesToInt): Seq[WalVertex] = { + import scala.collection.mutable + + val rawCells = result.rawCells() + + if (rawCells.isEmpty) Nil + else { + val head = rawCells.head + val row = head.getRow + + val version = HBaseType.DEFAULT_VERSION + val (vertexInnerId, colId, len) = VertexId.fromBytesRaw(row, 0, row.length, version) + + val column = schema.findServiceColumn(colId) + val columnMetaSeqExistInRow = len + 1 == row.length + + var maxTs = Long.MinValue + val propsMap = mutable.Map.empty[ColumnMeta, InnerValLike] + val belongLabelIds = mutable.ListBuffer.empty[Int] + + rawCells.map { cell => + val qualifier = cell.getQualifier + + val propKey = + if (columnMetaSeqExistInRow) row.last + else { + if (qualifier.length == 1) qualifier.head.toInt + else bytesToInt(qualifier, 0) + } + + val ts = cell.getTimestamp + + if (ts > maxTs) maxTs = ts + + if (S2Vertex.isLabelId(propKey)) { + belongLabelIds += S2Vertex.toLabelId(propKey) + } else { + val v = cell.getValue + + val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) + val columnMeta = schema.findServiceColumnMeta(colId, propKey.toByte) + propsMap += (columnMeta -> value) + } + } + + val propsJson = for { + (k, v) <- propsMap + jsValue <- JSONParser.innerValToJsValue(v, k.dataType) + } yield { + k.name -> jsValue + } + + val ts = propsMap + .get(ColumnMeta.timestamp) + .map(_.toIdString().toLong) + .getOrElse(maxTs) + + val walVertex = WalVertex( + ts, + "insert", + "vertex", + vertexInnerId.toIdString(), + schema.findColumnService(colId).serviceName, + column.columnName, + Json.toJson(propsJson.toMap).toString() + ) + + Seq(walVertex) + } + } + +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SchemaUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SchemaUtil.scala new file mode 100644 index 00000000..c47874da --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SchemaUtil.scala @@ -0,0 +1,115 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.s2jobs.wal.MemorySchemaManager + + +object SchemaUtil { + + import scalikejdbc._ + + var initialized = false + + def init(config: Config): Boolean = { + synchronized { + if (!initialized) { + logger.info(s"[SCHEMA]: initializing...") + logger.info(s"[SCHEMA]: $config") + Schema.apply(config) + initialized = true + } + + initialized + } + } + + def labelSrcColumnMap: Map[String, String] = { + val ls = DB readOnly { implicit session => + sql"""SELECT label, src_column_name FROM labels""".map { rs => + rs.string("label") -> rs.string("src_column_name") + }.list().apply() + } + + ls.toMap + } + + def parseLabelMeta(rs: WrappedResultSet): LabelMeta = { + LabelMeta( + rs.intOpt("id"), + rs.int("label_id"), + rs.string("name"), + rs.byte("seq"), + rs.string("default_value"), + rs.string("data_type").toLowerCase, + false + // rs.boolean("store_in_global_index") + ) + } + + def labelMetaMap(label: Label): Map[Byte, LabelMeta] = { + val reserved = LabelMeta.reservedMetas.map { m => + if (m == LabelMeta.to) m.copy(dataType = label.tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = label.srcColumnType) + else m + } + + val labelMetas = DB readOnly { implicit session => + sql"""SELECT * FROM label_metas WHERE label_id = ${label.id.get}""".map(parseLabelMeta).list().apply() + } + + (reserved ++ labelMetas).groupBy(_.seq).map { case (seq, labelMetas) => + seq -> labelMetas.head + } + } + + def buildSchemaManager(serviceNameColumnNames: Map[String, Seq[String]], + labelNames: Seq[String]): SchemaManager = { + + val _serviceLs = + if (serviceNameColumnNames.isEmpty) Service.findAll() + else { + serviceNameColumnNames.keys.toSeq.map { serviceName => + Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName not found.")) + } + } + + val labelLs = + if (labelNames.isEmpty) Label.findAll() + else { + labelNames.map { labelName => + Label.findByName(labelName).getOrElse(throw new IllegalArgumentException(s"$labelName not exist.")) + } + } + + val serviceLs = _serviceLs ++ labelLs.flatMap { label => + Seq(label.srcService, label.tgtService) + }.distinct + + val serviceColumnLs = serviceLs.flatMap { service => + ServiceColumn.findByServiceId(service.id.get) + } + + val columnMetaLs = serviceColumnLs.flatMap { column => + column.metas + } + + val labelIndexLs = labelLs.flatMap { label => + label.indices + } + val labelMetaLs = labelLs.flatMap { label => + label.metaProps + } + + MemorySchemaManager(serviceLs, serviceColumnLs, columnMetaLs, labelLs, labelIndexLs, labelMetaLs) + } + + def buildVertexDeserializeSchema(serviceNameColumnNames: Map[String, Seq[String]]): SchemaManager = { + buildSchemaManager(serviceNameColumnNames, Nil) + } + + def buildEdgeDeserializeSchema(labelNames: Seq[String]): SchemaManager = { + buildSchemaManager(Map.empty, labelNames) + } +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SerializeUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SerializeUtil.scala new file mode 100644 index 00000000..bf5b217d --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/SerializeUtil.scala @@ -0,0 +1,318 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import org.apache.hadoop.hbase.KeyValue +import org.apache.hadoop.hbase.KeyValue.Type +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.types._ +import org.apache.s2graph.s2jobs.wal.{MemorySchemaManager, WalLog, WalVertex} +import play.api.libs.json.{JsObject, Json} + +object SerializeUtil { + + import org.apache.s2graph.core.storage.serde.StorageSerializable._ + + private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { + val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) + + val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues + val indexEdgeKeyValues = relEdges.flatMap { edge => + edge.edgesWithIndex.flatMap { indexEdge => + s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues + } + } + + snapshotEdgeKeyValues ++ indexEdgeKeyValues + } + + def buildEdgeIndexInnerProps(walLog: WalLog, + innerProps: Map[LabelMeta, InnerValLike], + defaultProps: Map[LabelMeta, InnerValLike], + label: Label, + labelMetasInOrder: Array[LabelMeta]): Array[(LabelMeta, InnerValLike)] = { + labelMetasInOrder.map { labelMeta => + innerProps.get(labelMeta) match { + case None => + val defaultVal = JSONParser.toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion) + val innerVal = defaultProps.getOrElse(labelMeta, defaultVal) + + labelMeta -> innerVal + case Some(innerVal) => labelMeta -> innerVal + } + } + } + + def buildEdgeInnerDefaultProps(walLog: WalLog, + label: Label, + srcColumn: ServiceColumn, + tgtColumn: ServiceColumn): Map[LabelMeta, InnerValLike] = { + Map( + LabelMeta.timestamp -> InnerVal.withLong(walLog.timestamp, label.schemaVersion), + LabelMeta.from -> JSONParser.toInnerVal(walLog.from, srcColumn.columnType, srcColumn.schemaVersion), + LabelMeta.to -> JSONParser.toInnerVal(walLog.to, tgtColumn.columnType, tgtColumn.schemaVersion) + ) + } + + def buildEdgeInnerProps(walLog: WalLog, + label: Label, + schema: SchemaManager): Map[LabelMeta, InnerValLike] = { + val ts = walLog.timestamp + val schemaVer = label.schemaVersion + val tsInnerVal = InnerVal.withLong(ts, schemaVer) + + val labelMetas = schema.findLabelMetas(walLog.label) + val props = walLog.propsJson.fields.flatMap { case (key, jsValue) => + labelMetas.get(key).flatMap { labelMeta => + JSONParser.jsValueToInnerVal(jsValue, labelMeta.dataType, schemaVer).map { innerVal => + labelMeta -> innerVal + } + } + }.toMap + + props ++ Map(LabelMeta.timestamp -> tsInnerVal) + } + + def buildVertexInnerProps(walVertex: WalVertex, + column: ServiceColumn, + schema: SchemaManager): Map[ColumnMeta, InnerValLike] = { + val ts = walVertex.timestamp + val schemaVer = column.schemaVersion + val tsInnerVal = InnerVal.withLong(ts, schemaVer) + + val columnMetas = schema.findColumnMetas(column) + val props = Json.parse(walVertex.props).as[JsObject].fields.flatMap { case (key, jsValue) => + columnMetas.get(key).flatMap { columnMeta => + JSONParser.jsValueToInnerVal(jsValue, columnMeta.dataType, schemaVer).map { innerVal => + columnMeta -> innerVal + } + } + }.toMap + + props ++ Map(ColumnMeta.timestamp -> tsInnerVal, ColumnMeta.lastModifiedAtColumn -> tsInnerVal) + } + + case class PartialResult(label: Label, + srcColumn: ServiceColumn, + srcVertexId: VertexId, + tgtColumn: ServiceColumn, + tgtVertexId: VertexId, + op: Byte, + dir: Int, + props: Map[LabelMeta, InnerValLike], + defaultProps: Map[LabelMeta, InnerValLike]) { + val schemaVer = label.schemaVersion + + // need to build Array[Byte] for SKeyValue + lazy val table = label.hTableName.getBytes("UTF-8") + lazy val cf = Serializable.edgeCf + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) + lazy val labelWithDirBytes = labelWithDir.bytes + lazy val srcIdBytes = VertexId.toSourceVertexId(srcVertexId).bytes + lazy val tgtIdBytes = VertexId.toTargetVertexId(tgtVertexId).bytes + } + + def buildPartialResult(walLog: WalLog, + schema: SchemaManager): PartialResult = { + val label = schema.findLabel(walLog.label) + val dir = 0 + + val srcColumn = schema.findSrcServiceColumn(label) + val srcInnerVal = JSONParser.toInnerVal(walLog.from, srcColumn.columnType, srcColumn.schemaVersion) + val srcVertexId = VertexId(srcColumn, srcInnerVal) + + val tgtColumn = schema.findTgtServiceColumn(label) + val tgtInnerVal = JSONParser.toInnerVal(walLog.to, tgtColumn.columnType, tgtColumn.schemaVersion) + val tgtVertexId = VertexId(tgtColumn, tgtInnerVal) + + val allProps = buildEdgeInnerProps(walLog, label, schema) + val defaultProps = buildEdgeInnerDefaultProps(walLog, label, srcColumn, tgtColumn) + + val op = GraphUtil.toOp(walLog.operation).getOrElse(throw new IllegalArgumentException(s"${walLog.operation} operation is not supported.")) + + PartialResult(label, srcColumn, srcVertexId, tgtColumn, tgtVertexId, op, dir, allProps, defaultProps) + } + + def walToIndexEdgeKeyValue(walLog: WalLog, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Iterable[SKeyValue] = { + //TODO: Degree Edge is not considered here. + val pr = buildPartialResult(walLog, schema) + val label = pr.label + val srcIdBytes = pr.srcIdBytes + val tgtIdBytes = pr.tgtIdBytes + val labelWithDirBytes = pr.labelWithDirBytes + val allProps = pr.props + val defaultProps = pr.defaultProps + val op = pr.op + val table = pr.table + val cf = pr.cf + val isTallSchema = tallSchemaVersions(label.schemaVersion) + + schema.findLabelIndices(label).map { case (indexName, index) => + val labelMetasInOrder = schema.findLabelIndexLabelMetas(label.label, indexName) + + val orderedProps = buildEdgeIndexInnerProps(walLog, allProps, defaultProps, label, labelMetasInOrder) + val idxPropsMap = orderedProps.toMap + val idxPropsBytes = propsToBytes(orderedProps) + val metaProps = allProps.filterKeys(m => !idxPropsMap.contains(m)) + + val row = + if (isTallSchema) { + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(index.seq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val qualifier = idxPropsMap.get(LabelMeta.to) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(_) => idxPropsBytes + } + + val opByte = + if (op == GraphUtil.operations("incrementCount")) op + else GraphUtil.defaultOpByte + + Bytes.add(row, qualifier, Array.fill(1)(opByte)) + } else { + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(index.seq, isInverted = false) + Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } + + val qualifier = + if (isTallSchema) Array.empty[Byte] + else { + if (op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(op)) + } else { + idxPropsMap.get(LabelMeta.to) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(_) => idxPropsBytes + } + } + } + + val value = { + if (op == GraphUtil.operations("incrementCount")) { + longToBytes(allProps(LabelMeta.count).toString().toLong) + } else { + propsToKeyValues(metaProps.toSeq) + } + } + + SKeyValue(table, row, cf, qualifier, value, walLog.timestamp, op, durability = true) + } + } + + def walToSnapshotEdgeKeyValue(walLog: WalLog, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Iterable[SKeyValue] = { + val ts = walLog.timestamp + val pr = buildPartialResult(walLog, schema) + val label = pr.label + + val labelWithDirBytes = pr.labelWithDirBytes + val allProps = pr.props.map { case (labelMeta, innerVal) => + labelMeta.seq -> InnerValLikeWithTs(innerVal, ts) + } + + val op = pr.op + val table = pr.table + val cf = pr.cf + val statusCode = 0.toByte + + val isTallSchema = tallSchemaVersions(label.schemaVersion) + + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + val rowFirstBytes = + if (isTallSchema) SourceAndTargetVertexIdPair(pr.srcVertexId.innerId, pr.tgtVertexId.innerId).bytes + else VertexId.toSourceVertexId(pr.srcVertexId).bytes + + val row = Bytes.add(rowFirstBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val qualifier = + if (isTallSchema) Array.empty[Byte] + else pr.tgtIdBytes + + val metaPropsBytes = HBaseSerializable.propsToKeyValuesWithTs(allProps.toSeq) + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + + val value = Bytes.add(statusCodeWithOp(statusCode, op), metaPropsBytes) + + Seq( + SKeyValue(table, row, cf, qualifier, value, ts, op, durability = true) + ) + } + + def walToSKeyValues(walLog: WalLog, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Iterable[SKeyValue] = { + walToIndexEdgeKeyValue(walLog, schema, tallSchemaVersions) ++ + walToSnapshotEdgeKeyValue(walLog, schema, tallSchemaVersions) + } + + def walVertexToSKeyValue(v: WalVertex, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Iterable[SKeyValue] = { + val column = schema.findServiceColumn(v.service, v.column) + val service = schema.findColumnService(column.id.get) + val schemaVer = column.schemaVersion + + val innerVal = JSONParser.toInnerVal(v.id, column.columnType, column.schemaVersion) + val vertexId = VertexId(column, innerVal) + val row = vertexId.bytes + + val props = buildVertexInnerProps(v, column, schema) + //TODO: skip belongLabelIds + + val table = service.hTableName.getBytes("UTF-8") + val cf = Serializable.vertexCf + val isTallSchema = tallSchemaVersions(schemaVer) + + props.map { case (columnMeta, innerVal) => + val rowBytes = + if (isTallSchema) Bytes.add(row, Array.fill(1)(columnMeta.seq)) + else row + + val qualifier = + if (isTallSchema) Array.empty[Byte] + else intToBytes(columnMeta.seq) + + val value = innerVal.bytes + + SKeyValue(table, rowBytes, cf, qualifier, value, v.timestamp) + } + } + + // main public interface. + def toSKeyValues(walLog: WalLog, + schema: SchemaManager, + tallSchemaVersions: Set[String]): Iterable[SKeyValue] = { + walLog.elem match { + case "vertex" | "v" => walVertexToSKeyValue(WalVertex.fromWalLog(walLog), schema, tallSchemaVersions) + case "edge" | "e" => walToSKeyValues(walLog, schema, tallSchemaVersions) + case _ => throw new IllegalArgumentException(s"$walLog ${walLog.elem} is not supported.") + } + } + + def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues + } else { + Nil + } + } + + def sKeyValueToKeyValue(skv: SKeyValue): KeyValue = { + new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, Type.Put, skv.value) + } +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/TaskConfUtil.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/TaskConfUtil.scala new file mode 100644 index 00000000..3b4d73c5 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/TaskConfUtil.scala @@ -0,0 +1,18 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import org.apache.s2graph.core.S2GraphConfigs + +object TaskConfUtil { + + def parseHBaseConfigs(options: Map[String, String]): Map[String, Any] = { + options.filterKeys(S2GraphConfigs.HBaseConfigs.DEFAULTS.keySet) + } + + def parseMetaStoreConfigs(options: Map[String, String]): Map[String, Any] = { + options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet) + } + + def parseLocalCacheConfigs(options: Map[String, String]): Map[String, Any] = { + options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt) + } +} diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala index 8e205e3f..d5b27bb5 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala @@ -33,6 +33,7 @@ object S2SourceConfigs { val S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.names" val S2_SOURCE_BULKLOAD_HBASE_TABLE_CF = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.cf" val S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE = s"$S2_SOURCE_BULKLOAD_PREFIX.scan.batch.size" + val S2_SOURCE_BULKLOAD_LABEL_NAMES = s"$S2_SOURCE_BULKLOAD_PREFIX.label.names" // BULKLOAD val S2_SOURCE_BULKLOAD_BUILD_DEGREE = s"$S2_SOURCE_BULKLOAD_PREFIX.build.degree" diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtilTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtilTest.scala new file mode 100644 index 00000000..3d6e45ab --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/utils/DeserializeUtilTest.scala @@ -0,0 +1,117 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import org.apache.hadoop.hbase.CellUtil +import org.apache.hadoop.hbase.KeyValue.Type +import org.apache.hadoop.hbase.client.Result +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.s2jobs.wal.{MemorySchemaManager, WalLog, WalVertex} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import play.api.libs.json.{JsObject, JsValue, Json} + +class DeserializeUtilTest extends FunSuite with Matchers with BeforeAndAfterAll { + + def initTestSchemaManager(serviceName: String, + columnName: String, + labelName: String) = { + val service = Service(Option(1), serviceName, "token", "cluster", "test", 0, None) + val serviceLs = Seq(service) + val serviceColumn = ServiceColumn(Option(1), service.id.get, columnName, "string", "v3", None) + val serviceColumnLs = Seq(serviceColumn) + val columnMeta = ColumnMeta(Option(1), serviceColumn.id.get, "age", 1, "integer", "-1") + val columnMetaLs = Seq(columnMeta) + val label = Label(Option(1), labelName, + service.id.get, serviceColumn.columnName, serviceColumn.columnType, + service.id.get, serviceColumn.columnName, serviceColumn.columnType, + true, service.serviceName, service.id.get, "strong", + "test", None, "v3", false, "gz", None + ) + val labelLs = Seq(label) + val labelMeta = LabelMeta(Option(1), label.id.get, "score", 1, "0.0", "double") + val labelMetaLs = Seq(labelMeta) + val labelIndex = LabelIndex(Option(1), label.id.get, "pk", 1, Seq(labelMeta.seq), "", None, None) + val labelIndexLs = Seq(labelIndex) + + MemorySchemaManager(serviceLs, + serviceColumnLs, + columnMetaLs, + labelLs, + labelIndexLs, + labelMetaLs + ) + } + + def checkEqual(walVertex: WalVertex, deserializedWalVertex: WalVertex): Boolean = { + val msg = Seq("=" * 100, walVertex, deserializedWalVertex, "=" * 100).mkString("\n") + println(msg) + + val expectedProps = Json.parse(walVertex.props).as[JsObject] + val realProps = Json.parse(deserializedWalVertex.props).as[JsObject] + val isPropsSame = expectedProps.fieldSet.forall { case (k, expectedVal) => + (realProps \ k).asOpt[JsValue].map { realVal => expectedVal == realVal }.getOrElse(false) + } + + val isWalLogSame = walVertex.copy(props = "{}") == deserializedWalVertex.copy(props = "{}") + + isWalLogSame && isPropsSame + } + + def checkEqual(walLog: WalLog, deserializedWalLog: WalLog): Boolean = { + val msg = Seq("=" * 100, walLog, deserializedWalLog, "=" * 100).mkString("\n") + println(msg) + + val expectedProps = Json.parse(walLog.props).as[JsObject] + val realProps = Json.parse(deserializedWalLog.props).as[JsObject] + val isPropsSame = expectedProps.fieldSet.forall { case (k, expectedVal) => + (realProps \ k).asOpt[JsValue].map { realVal => expectedVal == realVal }.getOrElse(false) + } + + val isWalLogSame = walLog.copy(props = "{}") == deserializedWalLog.copy(props = "{}") + + isWalLogSame && isPropsSame + } + + def createResult(kvs: Iterable[SKeyValue]): Result = { + val cells = kvs.map { kv => + CellUtil.createCell(kv.row, kv.cf, kv.qualifier, kv.timestamp, Type.Put.getCode, kv.value) + } + + Result.create(cells.toArray) + } + + val schema = initTestSchemaManager("s1", "user", "l1") + val tallSchemaVersions = Set("v4") + val walLog = + WalLog(10L, "insert", "edge", "a", "x", "s1", "l1","""{"score": 20}""") + + val walVertex = + WalVertex(10L, "insert", "vertex", "v1", "s1", "user", """{"age": 20}""") + + test("test index edge serialize/deserialize") { + val kvs = SerializeUtil.walToIndexEdgeKeyValue(walLog, schema, tallSchemaVersions) + val result = createResult(kvs) + val wals = DeserializeUtil.indexEdgeResultToWals(result, schema, tallSchemaVersions) + + wals.size shouldBe 1 + checkEqual(walLog, wals.head) shouldBe true + } + + test("test snapshot edge serialize/deserialize") { + val kvs = SerializeUtil.walToSnapshotEdgeKeyValue(walLog, schema, tallSchemaVersions) + val result = createResult(kvs) + val wals = DeserializeUtil.snapshotEdgeResultToWals(result, schema, tallSchemaVersions) + + wals.size shouldBe 1 + checkEqual(walLog, wals.head) shouldBe true + } + + test("test vertex serialize/deserialize") { + val kvs = SerializeUtil.walVertexToSKeyValue(walVertex, schema, tallSchemaVersions) + val result = createResult(kvs) + val wals = DeserializeUtil.vertexResultToWals(result, schema) + + wals.size shouldBe 1 + checkEqual(walVertex, wals.head) shouldBe true + + } +}