Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.typesafe.config.Config


import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

Expand Down Expand Up @@ -39,29 +40,34 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) {

def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[Boolean]] = {
val futures = elements.map {
case edge: Edge => mutateEdge(edge, withWait)
case vertex: Vertex => mutateVertex(vertex, withWait)
case element => throw new RuntimeException(s"$element is not edge/vertex")

val edgeBuffer = ArrayBuffer[Edge]()
val vertexBuffer = ArrayBuffer[Vertex]()

elements.foreach {
case e: Edge => edgeBuffer += e
case v: Vertex => vertexBuffer += v
case any@_ => logger.error(s"Unknown type: ${any}")
}
Future.sequence(futures)

val edgeFuture = mutateEdges(edgeBuffer, withWait)
val vertexFuture = mutateVertices(vertexBuffer, withWait)

val graphFuture = for {
edgesMutated <- edgeFuture
verticesMutated <- vertexFuture
} yield edgesMutated ++ verticesMutated

graphFuture
}

def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean]

def mutateEdges(edges: Seq[Edge],
withWait: Boolean = false): Future[Seq[Boolean]] = {
val futures = edges.map { edge => mutateEdge(edge, withWait) }
Future.sequence(futures)
}
def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]]

def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean]

def mutateVertices(vertices: Seq[Vertex],
withWait: Boolean = false): Future[Seq[Boolean]] = {
val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
Future.sequence(futures)
}
def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]]

def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer,
}
}

def mutateVertices(vertices: Seq[Vertex],
withWait: Boolean = false): Future[Seq[Boolean]] = {
val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
Future.sequence(futures)
}

def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = {
val defers: Seq[Deferred[(Boolean, Long)]] = for {
edge <- edges
Expand Down