diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index bff0f3be..75ea84bf 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -8,6 +8,7 @@ import com.typesafe.config.Config import scala.collection.Seq +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -39,29 +40,34 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = elements.map { - case edge: Edge => mutateEdge(edge, withWait) - case vertex: Vertex => mutateVertex(vertex, withWait) - case element => throw new RuntimeException(s"$element is not edge/vertex") + + val edgeBuffer = ArrayBuffer[Edge]() + val vertexBuffer = ArrayBuffer[Vertex]() + + elements.foreach { + case e: Edge => edgeBuffer += e + case v: Vertex => vertexBuffer += v + case any@_ => logger.error(s"Unknown type: ${any}") } - Future.sequence(futures) + + val edgeFuture = mutateEdges(edgeBuffer, withWait) + val vertexFuture = mutateVertices(vertexBuffer, withWait) + + val graphFuture = for { + edgesMutated <- edgeFuture + verticesMutated <- vertexFuture + } yield edgesMutated ++ verticesMutated + + graphFuture } def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] - def mutateEdges(edges: Seq[Edge], - withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = edges.map { edge => mutateEdge(edge, withWait) } - Future.sequence(futures) - } + def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] - def mutateVertices(vertices: Seq[Vertex], - withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = vertices.map { vertex => mutateVertex(vertex, withWait) } - Future.sequence(futures) - } + def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 3ccb4731..7df9c9ac 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -191,6 +191,12 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } } + def mutateVertices(vertices: Seq[Vertex], + withWait: Boolean = false): Future[Seq[Boolean]] = { + val futures = vertices.map { vertex => mutateVertex(vertex, withWait) } + Future.sequence(futures) + } + def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = { val defers: Seq[Deferred[(Boolean, Long)]] = for { edge <- edges