Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.s2graph.core.schema


trait SchemaManager {
def findColumnService(columnId: Int): Service

def findServiceColumn(columnId: Int): ServiceColumn

def findServiceColumnMeta(columnId: Int, metaSeq: Byte): ColumnMeta

def findLabel(labelId: Int): Label

def findLabelService(labelId: Int): Service

def findLabelIndex(labelId: Int, indexSeq: Byte): LabelIndex

def findLabelIndexLabelMetas(labelId: Int, indexSeq: Byte): Array[LabelMeta]

def findLabelMetas(labelId: Int): Map[Byte, LabelMeta]

def findLabelMeta(labelId: Int, metaSeq: Byte): LabelMeta

def checkLabelExist(labelId: Int): Boolean

def checkServiceColumnExist(columnId: Int): Boolean

def findServiceColumn(serviceName: String, columnName: String): ServiceColumn

def findColumnMetas(serviceName: String, columnName: String): Map[String, ColumnMeta]

def findColumnMetas(column: ServiceColumn): Map[String, ColumnMeta]

def findLabel(labelName: String): Label

def findLabelService(labelName: String): Service

def findLabelIndices(label: Label): Map[String, LabelIndex]

def findLabelIndices(labelName: String): Map[String, LabelIndex]

def findLabelIndex(labelName: String, indexName: String): LabelIndex

def findLabelIndexLabelMetas(labelName: String, indexName: String): Array[LabelMeta]

def findLabelMetas(labelName: String): Map[String, LabelMeta]

def findLabelMeta(labelName: String, name: String): LabelMeta

def checkLabelExist(labelName: String): Boolean

def findSrcServiceColumn(label: Label): ServiceColumn

def findTgtServiceColumn(label: Label): ServiceColumn

def findSrcServiceColumn(labelName: String): ServiceColumn

def findTgtServiceColumn(labelName: String): ServiceColumn
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.s2graph.core.storage.serde
object Serializable {
val vertexCf = "v".getBytes("UTF-8")
val edgeCf = "e".getBytes("UTF-8")
val snapshotEdgeCf = "s".getBytes("UTF-8")
}

trait Serializable[E] extends StorageSerializable[E]
12 changes: 12 additions & 0 deletions s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ import play.api.libs.json.Json
object VertexId extends HBaseDeserializable {
import HBaseType._

def fromBytesRaw(bytes: Array[Byte],
offset: Int,
len: Int,
version: String = DEFAULT_VERSION): (InnerValLike, Int, Int) = {
var pos = offset + GraphUtil.bytesForMurMurHash

val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true)
pos += numOfBytesUsed
val colId = Bytes.toInt(bytes, pos, 4)

(innerId, colId, GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4)
}

def fromBytes(bytes: Array[Byte],
offset: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
columnFamily: String = "e",
batchSize: Int = 1000,
labelMapping: Map[String, String] = Map.empty,
buildDegree: Boolean = false): RDD[Seq[Cell]] = {
buildDegree: Boolean = false): RDD[(ImmutableBytesWritable, Result)] = {
val cf = Bytes.toBytes(columnFamily)

val hbaseConfig = HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration)
hbaseConfig.set("hbase.rootdir", snapshotPath)

val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]])
val initial = ss.sparkContext.parallelize(Seq.empty[(ImmutableBytesWritable, Result)])
tableNames.foldLeft(initial) { case (prev, tableName) =>
val scan = new Scan
scan.addFamily(cf)
Expand All @@ -186,7 +186,8 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
val current = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
classOf[TableSnapshotInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]).map(_._2.listCells().asScala.toSeq)
classOf[Result])
// .map(_._2.listCells().asScala.toSeq)

prev ++ current
}
Expand Down
76 changes: 56 additions & 20 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
package org.apache.s2graph.s2jobs.task

import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
import org.apache.s2graph.core.GraphUtil
import org.apache.hadoop.hbase.KeyValue
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.core.{GraphUtil, Management}
import org.apache.s2graph.s2jobs.S2GraphHelper
import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
import org.apache.s2graph.s2jobs.wal.WalLog
import org.apache.s2graph.s2jobs.wal.utils.{SchemaUtil, SerializeUtil}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
import org.elasticsearch.spark.sql.EsSparkSQL
Expand Down Expand Up @@ -219,38 +223,70 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"

private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
/*
* overwrite HBASE + MetaStorage + LocalCache configuration from given option.
*/
val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
TaskConf.parseLocalCacheConfigs(conf)
val config = Management.toConfig(mergedConf)

/*
* initialize meta storage connection.
* note that we only connect meta storage once at spark driver,
* then build schema manager which is serializable and broadcast it to executors.
*
* schema manager responsible for translation between logical logical representation and physical representation.
*
*/
SchemaUtil.init(config)
val ss = df.sparkSession
val sc = ss.sparkContext

// required for bulkload
val labelMapping = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty)
val buildDegree = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false)
val autoEdgeCreate = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false)
val skipError = getConfigStringOpt(graphConfig, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false)
val labelMapping = getConfigStringOpt(config, S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty)
val buildDegree = getConfigStringOpt(config, S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false)
val autoEdgeCreate = getConfigStringOpt(config, S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false)
val skipError = getConfigStringOpt(config, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false)

val zkQuorum = graphConfig.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM)
val tableName = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME)
val zkQuorum = config.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM)
val tableName = config.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME)

val numRegions = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt
val outputPath = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR)
val numRegions = config.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt
val outputPath = config.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR)

// optional.
val incrementalLoad = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false)
val compressionAlgorithm = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4")
val incrementalLoad = getConfigStringOpt(config, S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false)
val compressionAlgorithm = getConfigStringOpt(config, S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4")

val hbaseConfig = HFileGenerator.toHBaseConfig(zkQuorum, tableName)

val input = df.rdd
val schema = SchemaUtil.buildSchemaManager(Map.empty, Nil)
val schemaBCast = sc.broadcast(schema)
val tallSchemaVersions = Set(HBaseType.VERSION4)

implicit val enc = Encoders.kryo[KeyValue]

val transformer = new SparkBulkLoaderTransformer(graphConfig, labelMapping, buildDegree)
val kvs = df.mapPartitions { iter =>
val schema = schemaBCast.value

implicit val reader = new RowBulkFormatReader
implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError)
iter.flatMap { row =>
val walLog = WalLog.fromRow(row)

SerializeUtil.walToSKeyValues(walLog, schema, tallSchemaVersions)
.map(SerializeUtil.sKeyValueToKeyValue)
}
}

val kvs = transformer.transform(input)
// val input = df
// val transformer = new SparkBulkLoaderTransformer(graphConfig, labelMapping, buildDegree)
//
// implicit val reader = new RowBulkFormatReader
// implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError)
//
// val kvs = transformer.transform(input).flatMap(ls => ls)

HFileGenerator.generateHFile(df.sparkSession.sparkContext, graphConfig,
kvs.flatMap(ls => ls), hbaseConfig, tableName,
HFileGenerator.generateHFile(df.sparkSession.sparkContext, config,
kvs.rdd, hbaseConfig, tableName,
numRegions, outputPath, incrementalLoad, compressionAlgorithm
)

Expand Down
80 changes: 57 additions & 23 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,26 @@

package org.apache.s2graph.s2jobs.task

import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.core.{JSONParser, Management}
import org.apache.s2graph.s2jobs.Schema
import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
import org.apache.s2graph.s2jobs.loader.HFileGenerator
import org.apache.s2graph.s2jobs.wal.utils.{DeserializeUtil, SchemaUtil}
import org.apache.spark.sql.{DataFrame, SparkSession}
import play.api.libs.json.{JsObject, Json}


/**
* Source
*
* @param conf
*/
abstract class Source(override val conf:TaskConf) extends Task {
def toDF(ss:SparkSession):DataFrame
abstract class Source(override val conf: TaskConf) extends Task {
def toDF(ss: SparkSession): DataFrame
}

class KafkaSource(conf:TaskConf) extends Source(conf) {
class KafkaSource(conf: TaskConf) extends Source(conf) {
val DEFAULT_FORMAT = "raw"

override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe")

def repartition(df: DataFrame, defaultParallelism: Int) = {
Expand All @@ -51,7 +51,7 @@ class KafkaSource(conf:TaskConf) extends Source(conf) {
}
}

override def toDF(ss:SparkSession):DataFrame = {
override def toDF(ss: SparkSession): DataFrame = {
logger.info(s"${LOG_PREFIX} options: ${conf.options}")

val format = conf.options.getOrElse("format", "raw")
Expand All @@ -61,22 +61,22 @@ class KafkaSource(conf:TaskConf) extends Source(conf) {
format match {
case "raw" => partitionedDF
case "json" => parseJsonSchema(ss, partitionedDF)
// case "custom" => parseCustomSchema(df)
// case "custom" => parseCustomSchema(df)
case _ =>
logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ")
partitionedDF
}
}

def parseJsonSchema(ss:SparkSession, df:DataFrame):DataFrame = {
def parseJsonSchema(ss: SparkSession, df: DataFrame): DataFrame = {
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.DataType
import ss.implicits._

val schemaOpt = conf.options.get("schema")
schemaOpt match {
case Some(schemaAsJson:String) =>
val dataType:DataType = DataType.fromJson(schemaAsJson)
case Some(schemaAsJson: String) =>
val dataType: DataType = DataType.fromJson(schemaAsJson)
logger.debug(s"${LOG_PREFIX} schema : ${dataType.sql}")

df.selectExpr("CAST(value AS STRING)")
Expand All @@ -90,8 +90,9 @@ class KafkaSource(conf:TaskConf) extends Source(conf) {
}
}

class FileSource(conf:TaskConf) extends Source(conf) {
class FileSource(conf: TaskConf) extends Source(conf) {
val DEFAULT_FORMAT = "parquet"

override def mandatoryOptions: Set[String] = Set("paths")

override def toDF(ss: SparkSession): DataFrame = {
Expand Down Expand Up @@ -119,7 +120,8 @@ class FileSource(conf:TaskConf) extends Source(conf) {
}
}

class HiveSource(conf:TaskConf) extends Source(conf) {

class HiveSource(conf: TaskConf) extends Source(conf) {
override def mandatoryOptions: Set[String] = Set("database", "table")

override def toDF(ss: SparkSession): DataFrame = {
Expand All @@ -132,40 +134,72 @@ class HiveSource(conf:TaskConf) extends Source(conf) {
}

class S2GraphSource(conf: TaskConf) extends Source(conf) {

import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._

override def mandatoryOptions: Set[String] = Set(
S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR,
S2_SOURCE_BULKLOAD_RESTORE_PATH,
S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES
S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES,
S2_SOURCE_BULKLOAD_LABEL_NAMES
)

override def toDF(ss: SparkSession): DataFrame = {
/*
* overwrite HBASE + MetaStorage + LocalCache configuration from given option.
*/
val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
TaskConf.parseLocalCacheConfigs(conf)
val config = Management.toConfig(mergedConf)

/*
* initialize meta storage connection.
* note that we only connect meta storage once at spark driver,
* then build schema manager which is serializable and broadcast it to executors.
*
* schema manager responsible for translation between logical logical representation and physical representation.
*
*/
SchemaUtil.init(config)

val sc = ss.sparkContext

val snapshotPath = conf.options(S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR)
val restorePath = conf.options(S2_SOURCE_BULKLOAD_RESTORE_PATH)
val tableNames = conf.options(S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES).split(",")
val columnFamily = conf.options.getOrElse(S2_SOURCE_BULKLOAD_HBASE_TABLE_CF, "e")
val batchSize = conf.options.getOrElse(S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE, "1000").toInt
val labelNames = conf.options(S2_SOURCE_BULKLOAD_LABEL_NAMES).split(",").toSeq

val labelMapping = Map.empty[String, String]
val buildDegree =
if (columnFamily == "v") false
else conf.options.getOrElse(S2_SOURCE_BULKLOAD_BUILD_DEGREE, "false").toBoolean
val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge")
val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge").toLowerCase

val resultSchema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema

val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
restorePath, tableNames, columnFamily, batchSize, labelMapping, buildDegree)

implicit val reader = new S2GraphCellReader(elementType)
implicit val writer = new RowDataFrameWriter

val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
val kvs = transformer.transform(cells)
val schema = SchemaUtil.buildEdgeDeserializeSchema(labelNames)
val schemaBCast = sc.broadcast(schema)
val tallSchemaVersions = Set(HBaseType.VERSION4)
val tgtDirection = 0

val results = cells.mapPartitions { iter =>
val schema = schemaBCast.value

iter.flatMap { case (_, result) =>
elementType.toLowerCase match {
case "indexedge" => DeserializeUtil.indexEdgeResultToWals(result, schema, tallSchemaVersions, tgtDirection).map(DeserializeUtil.walLogToRow)
case "snapshotedge" => DeserializeUtil.snapshotEdgeResultToWals(result, schema, tallSchemaVersions).map(DeserializeUtil.walLogToRow)
case "vertex" => DeserializeUtil.vertexResultToWals(result, schema).map(DeserializeUtil.walVertexToRow)
case _ => throw new IllegalArgumentException(s"$elementType is not supported.")
}
}
}

ss.sqlContext.createDataFrame(kvs, schema)
ss.createDataFrame(results, resultSchema)
}
}
Loading