Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions s2core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.play" %% "play-json" % playVersion,
"com.typesafe.akka" %% "akka-actor" % "2.3.4",
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
// "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
"com.google.guava" % "guava" % "19.0" force(),
"org.apache.hbase" % "hbase-client" % hbaseVersion excludeLogging(),
"org.apache.hbase" % "hbase-common" % hbaseVersion excludeLogging(),
"org.apache.hbase" % "hbase-server" % hbaseVersion excludeLogging() exclude("com.google.protobuf", "protobuf*"),
Expand Down Expand Up @@ -58,7 +59,8 @@ libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
"net.pishen" %% "annoy4s" % annoy4sVersion,
"org.tensorflow" % "tensorflow" % tensorflowVersion,
"io.reactivex" %% "rxscala" % "0.26.5"
"io.reactivex" %% "rxscala" % "0.26.5",
"com.spotify" % "async-datastore-client" % "3.0.2" excludeLogging() exclude("com.google.guava", "guava*")
)

libraryDependencies := {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class GraphElementBuilder(graph: S2GraphLike) {
case _ =>
edge
.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
.copyTs(requestTs)
.copyOp(GraphUtil.operations("delete"))
}

val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ case class QueryParam(labelName: String,
Bytes.add(bytes, optionalCacheKey)
}

private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike]): Seq[(LabelMeta, InnerValLike)] = {
def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike]): Seq[(LabelMeta, InnerValLike)] = {
kvs.map { case (propKey, propValJs) =>
propValJs match {
case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") =>
Expand Down
2 changes: 1 addition & 1 deletion s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class SnapshotEdge(graph: S2GraphLike,
dir: Int,
op: Byte,
version: Long,
private val propsWithTs: Props,
propsWithTs: Props,
pendingEdgeOpt: Option[S2EdgeLike],
statusCode: Byte = 0,
lockTs: Option[Long],
Expand Down
2 changes: 2 additions & 0 deletions s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.datastore.DatastoreStorage
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.rocks.RocksStorage
import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
Expand Down Expand Up @@ -109,6 +110,7 @@ object S2Graph {

new AsynchbaseStorage(graph, config)
case "rocks" => new RocksStorage(graph, config)
case "datastore" => new DatastoreStorage(graph, config)
case _ => throw new RuntimeException("not supported storage.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ object S2GraphConfigs {
S2GraphConfigs.LogConfigs.DEFAULTS

val S2GRAPH_STORE_BACKEND = "s2graph.storage.backend"
val DEFAULT_S2GRAPH_STORE_BACKEND = "hbase"
val DEFAULT_S2GRAPH_STORE_BACKEND = "datastore"
// "rocks"
// "hbase"

val PHASE = "phase"
val DEFAULT_PHASE = "dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,13 @@ object TraversalHelper {
val labelWeight = queryRequest.labelWeight
val where = queryParam.where.get
val isDefaultTransformer = queryParam.edgeTransformer.isDefault
val (minTs, maxTs) = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))

def validTgtVertexId(edge: S2EdgeLike): Boolean = queryParam.tgtVertexInnerIdOpt.map(edge.tgtForVertex.innerId == _).getOrElse(true)

if (where != WhereParser.success && !where.filter(edge)) Nil
else if (edge.ts < minTs || edge.ts >= maxTs) Nil
else if (!validTgtVertexId(edge)) Nil
else {
val edges = if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
edges.map { e =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.WhereParserException
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.core.types.InnerValLike

import scala.annotation.tailrec
Expand Down Expand Up @@ -51,6 +52,17 @@ trait ExtractValue {
}
}

def anyValueToCompare(label: Label, dir: Int, key: String, value: AnyRef): InnerValLike = {
val labelMeta = label.metaPropsInvMap.getOrElse(key, throw WhereParserException(s"Where clause contains not existing property name: $key"))
val (srcColumn, tgtColumn) = label.srcTgtColumn(dir)
val dataType = key match {
case "_to" | "to" => tgtColumn.columnType
case "_from" | "from" => srcColumn.columnType
case _ => labelMeta.dataType
}
toInnerVal(value, dataType, label.schemaVersion)
}

private def edgePropToInnerVal(edge: S2EdgeLike, key: String): InnerValLike = {
val (propKey, parentEdge) = findParentEdge(edge, key)

Expand All @@ -70,14 +82,7 @@ trait ExtractValue {
else {
val (propKey, _) = findParentEdge(edge, key)

val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey"))
val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.getDir())
val dataType = propKey match {
case "_to" | "to" => tgtColumn.columnType
case "_from" | "from" => srcColumn.columnType
case _ => labelMeta.dataType
}
toInnerVal(value, dataType, label.schemaVersion)
anyValueToCompare(label, edge.getDir(), propKey, value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ case class Label(id: Option[Int], label: String,
lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
lazy val metaPropNames = metaProps.map(x => x.name)
lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)).toMap
lazy val validLabelMetasInvMap = labelMetas.map(x => (x.name, x)).filter(_._2.seq >= 0).toMap

/** this is used only by edgeToProps */
lazy val metaPropsDefaultMap = (for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ case class ServiceColumn(id: Option[Int],
meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion)
}.toMap
lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
lazy val validColumnMetasInvMap = ColumnMeta.findAllByColumn(id.get, useCache = true).map(meta => meta.name -> meta).toMap

def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = {
val ret = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DefaultOptimisticEdgeMutator(graph: S2GraphLike,
val futures = for {
edgeWithScore <- stepInnerResult.edgeWithScores
} yield {
val edge = edgeWithScore.edge
val edge = edgeWithScore.edge.copyTs(requestTs)

val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,46 @@ import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.utils.logger

object StorageIO {

val dummyCursor: Array[Byte] = Array.empty

def toEdges(edges: Seq[S2EdgeLike],
queryRequest: QueryRequest,
parentEdges: Seq[EdgeWithScore],
degreeEdges: Seq[EdgeWithScore],
lastCursor: Seq[Array[Byte]],
startOffset: Int = 0,
len: Int = Int.MaxValue): StepResult = {
if (edges.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
else {
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
val edgeWithScores = for {
(edge, idx) <- edges.zipWithIndex if idx >= startOffset && idx < startOffset + len
edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
} yield {
edgeWithScore
}

if (!queryOption.ignorePrevStepCache) {
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
} else {
val sampled =
if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
else edgeWithScores

val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled

StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
}
}
}
}
class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
import TraversalHelper._
import StorageIO._

val dummyCursor: Array[Byte] = Array.empty

/** Parsing Logic: parse from kv from Storage into Edge */
def toEdge[K: CanSKeyValue](kv: K,
Expand Down Expand Up @@ -80,15 +116,14 @@ class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
}
}

//TODO: extract a method that accept Seq[S2Edge](not kvs) then build StepResult
def toEdges[K: CanSKeyValue](kvs: Seq[K],
queryRequest: QueryRequest,
prevScore: Double = 1.0,
isInnerCall: Boolean,
parentEdges: Seq[EdgeWithScore],
startOffset: Int = 0,
len: Int = Int.MaxValue): StepResult = {


val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _

if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.s2graph.core.storage.datastore

import com.spotify.asyncdatastoreclient.{Datastore, QueryBuilder}
import org.apache.s2graph.core._
import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.core.storage.StorageIO
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.utils.{DeferCache, logger}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.collection.JavaConverters._

class DatastoreEdgeFetcher(graph: S2GraphLike,
datastore: Datastore) extends EdgeFetcher {

import DatastoreStorage._

lazy private val futureCache =
new DeferCache[StepResult, Promise, Future](graph.config, StepResult.Empty, "DatastoreFutureCache", false)

private def fetch(queryRequest: QueryRequest,
parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Future[StepResult] = {
val queryParam = queryRequest.queryParam

def fetchInner(query: com.spotify.asyncdatastoreclient.Query): Future[StepResult] = {
asScala(datastore.executeAsync(query)).map { queryResult =>
val edges = queryResult.getAll.asScala.map(toS2Edge(graph, _))

// not support degree edges.
val degreeEdges = Nil

// not support cursor yet.
val lastCursor = Nil

StorageIO.toEdges(edges, queryRequest, parentEdges, degreeEdges, lastCursor, queryParam.offset, queryParam.limit)
}
}

//TODO: toQuery should set up all query options property to datastore Query class.
val query = toQuery(graph, queryRequest, parentEdges)

if (queryParam.cacheTTLInMillis < 0) fetchInner(query)
else {
val fullCacheKey = queryRequest.query.fullCacheKey

futureCache.getOrElseUpdate(fullCacheKey, queryParam.cacheTTLInMillis)(fetchInner(query))
}
}

override def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
val futures = queryRequests.map { queryRequest =>
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil

fetch(queryRequest, parentEdges)
}

Future.sequence(futures)
}

override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
val distinctLabels = labels.toSet
val kind = toKind(hTableName, EdgePostfix)

asScala(datastore.executeAsync(toQuery(kind))).map { queryResult =>
queryResult.getAll().asScala.map { entity =>
toS2Edge(graph, entity)
}.filter(e => distinctLabels(e.innerLabel))
}
}

Future.sequence(futures).map(_.flatten)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.s2graph.core.storage.datastore

import com.google.common.util.concurrent.ListenableFuture
import com.spotify.asyncdatastoreclient.{Datastore, Key, QueryBuilder, TransactionResult}
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.MutateResponse

import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._

class DatastoreEdgeMutator(graph: S2GraphLike,
datastore: Datastore) extends EdgeMutator {

import DatastoreStorage._

def fetchAndDeletes(edges: Seq[S2EdgeLike])(implicit ec: ExecutionContext) = {
if (edges.isEmpty) Future.successful(MutateResponse.Success)
else {
asScala(datastore.executeAsync(toQuery(edges.head))).flatMap { queryResult =>
val batch = QueryBuilder.batch()
queryResult.getAll.asScala.map { entity =>
batch.add(QueryBuilder.delete(entity.getKey()))
}
asScala(datastore.executeAsync(batch)).map { _ => MutateResponse.Success}
}
}
}

//TODO: pool of datastore?(lookup by zkQuorum)
override def mutateStrongEdges(zkQuorum: String,
_edges: Seq[S2EdgeLike],
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
val grouped = _edges.groupBy { edge =>
(edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
}

val futures = grouped.map { case (_, edges) =>
val (squashedEdge, _) = S2Edge.buildOperation(None, edges)
// first delete all indexed edges.
val (outEdges, inEdges) = edges.partition(_.getDirection() == "out")

fetchAndDeletes(outEdges).flatMap { _ =>
fetchAndDeletes(inEdges).flatMap { _ =>
// val mutations = toMutationStatement(squashedEdge)
val mutations = toBatch(squashedEdge)
asScala(datastore.executeAsync(mutations))
}
}
}

//TODO: need to ensure the index of parameter sequence with correct return type
Future.sequence(futures).map(_.map(_ => true).toSeq)
}

override def mutateWeakEdges(zkQuorum: String,
_edges: Seq[S2EdgeLike],
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
val batch = QueryBuilder.batch()
val distinct = _edges.groupBy(encodeEdgeKey).values.flatten.toSet

distinct.foreach { edge =>
toBatch(edge, batch)
}

val mutations = batch
//TODO: need to ensure the index of parameter sequence with correct return type
asScala(datastore.executeAsync(mutations)).map { _ =>
(0 until _edges.size).map(_ -> true)
}
}

override def incrementCounts(zkQuorum: String,
edges: Seq[S2EdgeLike],
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = ???

override def updateDegree(zkQuorum: String,
edge: S2EdgeLike,
degreeVal: Long)(implicit ec: ExecutionContext): Future[MutateResponse] = ???

override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
if (stepInnerResult.isEmpty) Future.successful(true)
else {
val edges = stepInnerResult.edgeWithScores.map(_.edge)
val head = edges.head
val zkQuorum = head.innerLabel.hbaseZkAddr

mutateWeakEdges(zkQuorum, edges, true).map { mutateResult =>
mutateResult.forall(_._2)
}
}
}
}
Loading