Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,6 @@ server.pid
/dist/
.cache

### Redis ###
dump.rdb

28 changes: 26 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ lazy val commonSettings = Seq(
version := "0.12.1-SNAPSHOT",
scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint"),
javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq,
testOptions in Test += Tests.Argument("-oDF"),
testOptions in Test ++= Seq(
Tests.Argument("-oDF"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "CommonTest"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V1Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V2Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V3Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V4Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "HBaseTest")
),
parallelExecution in Test := false,
resolvers ++= Seq(
Resolver.mavenLocal,
Expand All @@ -49,7 +57,23 @@ lazy val s2rest_netty = project
.dependsOn(s2core)
.settings(commonSettings: _*)

lazy val s2core = project.settings(commonSettings: _*)
lazy val HBaseTest = config("hbase") extend(Test)
lazy val RedisTest = config("redis") extend(Test)

lazy val s2core = project.settings(commonSettings: _*).
configs(HBaseTest).
configs(RedisTest).
settings(inConfig(RedisTest)(Defaults.testTasks): _*).
settings(
testOptions in RedisTest --= Seq(
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V1Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V2Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "V3Test"),
Tests.Argument(TestFrameworks.ScalaTest, "-n", "HBaseTest")
),
testOptions in RedisTest += Tests.Argument(TestFrameworks.ScalaTest, "-n", "RedisTest")
)


lazy val spark = project.settings(commonSettings: _*)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.s2graph.loader.subscriber

import org.apache.s2graph.core.Management
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.core.types.GraphType
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest._
import TransferToHFile._
Expand Down Expand Up @@ -81,7 +81,7 @@ class TransferToHFileTest extends FlatSpec with BeforeAndAfterAll with Matchers
Seq(),
"weak",
None, None,
HBaseType.DEFAULT_VERSION,
GraphType.DEFAULT_VERSION,
false,
Management.defaultCompressionAlgorithm
)
Expand Down
3 changes: 2 additions & 1 deletion s2core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ libraryDependencies ++= Seq(
"org.scalikejdbc" %% "scalikejdbc" % "2.1.+",
"mysql" % "mysql-connector-java" % "5.1.28",
"org.apache.kafka" % "kafka-clients" % "0.8.2.0" excludeAll(ExclusionRule(organization = "org.slf4j"), ExclusionRule(organization = "com.sun.jdmk"), ExclusionRule(organization = "com.sun.jmx"), ExclusionRule(organization = "javax.jms")),
"com.github.danielwegener" % "logback-kafka-appender" % "0.0.4"
"com.github.danielwegener" % "logback-kafka-appender" % "0.0.4",
"redis.clients" % "jedis" % "2.7.0"
)

libraryDependencies := {
Expand Down
19 changes: 12 additions & 7 deletions s2core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,20 @@ cache.ttl.seconds=60
cache.max.size=100000

# DB
s2graph.models.table.name = "models-dev"
db.default.driver = "com.mysql.jdbc.Driver"
db.default.url = "jdbc:mysql://"${host}":3306/graph_dev"
db.default.user = "graph"
db.default.password = "graph"
s2graph.models.table.name="models-dev"
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
db.default.user="graph"
db.default.password="graph"


akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
loggers=["akka.event.slf4j.Slf4jLogger"]
loglevel="DEBUG"
}

# hbase, redis, etc.
s2graph.storage.backend=redis

# for redis
# redis.storage=["localhost:6379"]
5 changes: 3 additions & 2 deletions s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.s2graph.core
import java.util
import java.util.concurrent.ConcurrentHashMap

import com.kakao.s2graph.core.storage.redis.RedisStorage
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.s2graph.core.mysqls.{Label, Model}
import org.apache.s2graph.core.parsers.WhereParser
Expand Down Expand Up @@ -59,8 +60,7 @@ object Graph {
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"s2graph.storage.backend" -> "hbase"
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000)
)

var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
Expand Down Expand Up @@ -351,6 +351,7 @@ object Graph {
def initStorage(config: Config)(ec: ExecutionContext) = {
config.getString("s2graph.storage.backend") match {
case "hbase" => new AsynchbaseStorage(config)(ec)
case "redis" => new RedisStorage(config)(ec)
case _ => throw new RuntimeException("not supported storage.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ object GraphExceptions {
case class FetchTimeoutException(msg: String) extends Exception(msg)

case class DropRequestException(msg: String) extends Exception(msg)

case class UnsupportedVersionException(msg: String) extends Exception(msg)
}
4 changes: 4 additions & 0 deletions s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,8 @@ object GraphUtil {
}
}

def bytesToHexString(b: Array[Byte]): String = {
val tmp = b.map("%02x".format(_)).mkString("\\x")
if ( tmp.isEmpty ) "" else "\\x" + tmp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.types.GraphType._
import org.apache.s2graph.core.types._
import play.api.libs.json.Reads._
import play.api.libs.json._
Expand All @@ -45,7 +45,7 @@ object Management extends JSONParser {

}

import HBaseType._
import GraphType._

val DefaultCompressionAlgorithm = "gz"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.s2graph.core.mysqls
import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
import org.apache.s2graph.core.{GraphUtil, JSONParser}
import play.api.libs.json.Json
import scalikejdbc._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class RequestParser(config: Config) extends JSONParser {
// expect new label don`t provide hTableName
val hTableName = (jsValue \ "hTableName").asOpt[String]
val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(GraphType.DEFAULT_VERSION)
val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
}
}

private def getVertices(jsValue: JsValue) = {
def getVertices(jsValue: JsValue) = {
val jsonQuery = jsValue
val ts = System.currentTimeMillis()
val props = "{}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait Deserializable[E] extends StorageDeserializable[E] {
pos += srcIdLen
val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
pos += 4
val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsSnapshot(kv.row, pos)

val rowLen = srcIdLen + 4 + 1
(srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object SKeyValue {
val Put = 1
val Delete = 2
val Increment = 3
val SnapshotPut = 4 // redis exclusive
val Default = Put
}
case class SKeyValue(table: Array[Byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Random, Try}

abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
import HBaseType._
import GraphType._

/** storage dependent configurations */
val MaxRetryNum = config.getInt("max.retry.number")
Expand Down Expand Up @@ -94,7 +94,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
* @param vertex: vertex to serialize
* @return serializer implementation
*/
def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex)
def vertexSerializer(vertex: Vertex): Serializable[Vertex] = new VertexSerializable(vertex)

/**
* create deserializer that can parse stored CanSKeyValue into snapshotEdge.
Expand Down Expand Up @@ -259,9 +259,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
compressionAlgorithm: String): Unit





/** Public Interface */

def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
Expand All @@ -276,6 +273,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val queryParam = QueryParam.Empty
val q = Query.toQuery(Seq(vertex), queryParam)
val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)

fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs =>
fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion)
} recoverWith { case ex: Throwable =>
Expand Down Expand Up @@ -316,7 +314,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) =>
val mutations = edges.flatMap { edge =>
val (_, edgeUpdate) =
if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge)
if (edge.op == GraphUtil.operations("delete")) {logger.debug(s">>>>> op == delete"); Edge.buildDeleteBulk(None, edge)}
else Edge.buildOperation(None, Seq(edge))
buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
Expand Down Expand Up @@ -442,6 +440,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.error(s"\n[[ fetch timeout exception")
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
retry(tryNum + 1)(edges, statusCode)(fn)

Expand All @@ -456,6 +455,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {

Thread.sleep(Random.nextInt(MaxBackOff))
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
logger.error(s"\n[[ Try : $tryNum - $status")
retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
case ex: Exception =>
logger.error("Unknown exception", ex)
Expand Down Expand Up @@ -545,7 +545,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
} yield {
val label = queryRequest.queryParam.label
label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
case GraphType.VERSION3 | GraphType.VERSION4 =>
if (label.consistencyLevel == "strong") {
/**
* read: snapshotEdge on queryResult = O(N)
Expand Down Expand Up @@ -816,7 +816,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
logger.debug(log)
// debug(ret, "acquireLock", edge.toSnapshotEdge)
} else {
throw new PartialFailureException(edge, 0, "hbase fail.")
throw new PartialFailureException(edge, 0, "acquireLock failed")
}
true
}
Expand All @@ -837,6 +837,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
val p = Random.nextDouble()
if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
// if (p < 0.3) throw new PartialFailureException(edge, 3, "aaa releaseLock fail.")
else {
val releaseLockEdgePut = snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head
val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
Expand All @@ -848,6 +849,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
if (ret) {
debug(ret, "releaseLock", edge.toSnapshotEdge)
} else {
logger.error(s"\n[[ release lock failed")
val msg = Seq("\nFATAL ERROR\n",
"=" * 50,
oldBytes.toList,
Expand All @@ -860,7 +862,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
)
logger.error(msg.mkString("\n"))
// error(ret, "releaseLock", edge.toSnapshotEdge)
throw new PartialFailureException(edge, 3, "hbase fail.")
throw new PartialFailureException(edge, 3, "aaa releaseLock fail.")
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ package org.apache.s2graph.core.storage

import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.QueryParam
import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.types.{GraphType, InnerVal, InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.utils.logger

object StorageDeserializable {
/** Deserializer */
def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = {
def bytesToLabelIndexSeqWithIsSnapshot(bytes: Array[Byte], offset: Int): (Byte, Boolean) = {
val byte = bytes(offset)
val isInverted = if ((byte & 1) != 0) true else false
val labelOrderSeq = byte >> 1
Expand Down Expand Up @@ -85,7 +85,7 @@ object StorageDeserializable {
val kvs = new Array[(Byte, InnerValLike)](len)
var i = 0
while (i < len) {
val k = HBaseType.EMPTY_SEQ_BYTE
val k = GraphType.EMPTY_SEQ_BYTE
val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
pos += numOfBytesUsed
kvs(i) = (k -> v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ object StorageSerializable {
bytes
}

def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = {
def labelOrderSeqWithIsSnapshot(labelOrderSeq: Byte, isSnapshot: Boolean): Array[Byte] = {
assert(labelOrderSeq < (1 << 6))
val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
val byte = labelOrderSeq << 1 | (if (isSnapshot) 1 else 0)
Array.fill(1)(byte.toByte)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.types.{GraphType, VertexId}
import org.apache.s2graph.core.utils.{DeferCache, Extensions, FutureCache, logger}
import org.hbase.async._

Expand Down Expand Up @@ -196,7 +196,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))

label.schemaVersion match {
case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
case GraphType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
val scanner = client.newScanner(label.hbaseTableName.getBytes)
scanner.setFamily(edgeCf)

Expand All @@ -208,7 +208,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte

val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsSnapshot(indexEdge.labelIndexSeq, isSnapshot = false)
// val labelIndexSeqWithIsInvertedStopBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true)
val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
val (startKey, stopKey) =
Expand Down
Loading