-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
several changes here: - addImageBlob and addPerson now return the `Vertex` for the added blob and the added `Canonical`, wrapped in a case class called `AddBlobResult` (could use a rename). This is needed because just returning the `Canonical` would require querying for the canonical's `Vertex` if we want to reference it, and that query will fail if the transaction is uncommitted. - uses of `Vertex.lift` are wrapped in try/catch blocks to catch the `OStorageException` that will be thrown if the vertex does not already exist. - The `addPerson` and `addImageBlob` both call through to a generic `addMetadataBlob` function, which will query for existing blob and canonical, or add blob & create canonical if none exists
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,15 +5,18 @@ import Types._ | |
import core.GraphError | ||
import core.GraphError._ | ||
import gremlin.scala._ | ||
import io.mediachain.util.GremlinUtils._ | ||
import Traversals.{GremlinScalaImplicits, VertexImplicits} | ||
import com.orientechnologies.orient.core.exception.OStorageException | ||
|
||
object Ingress { | ||
|
||
import Traversals.{GremlinScalaImplicits, VertexImplicits} | ||
case class BlobAddResult(blobVertex: Vertex, canonicalVertex: Vertex) | ||
|
||
def ingestBlobBundle(graph: Graph, bundle: BlobBundle | ||
, rawMetadata: Option[RawMetadataBlob] = None) | ||
: Xor[GraphError, Canonical] = { | ||
val addResultXor: Xor[GraphError, (Vertex, Canonical)] = | ||
: Xor[GraphError, Canonical] = withTransactionXor(graph) { | ||
val addResultXor: Xor[GraphError, BlobAddResult] = | ||
bundle.content match { | ||
case image: ImageBlob => { | ||
addImageBlob(graph, image, rawMetadata) | ||
|
@@ -27,16 +30,16 @@ object Ingress { | |
} | ||
|
||
addResultXor.flatMap { addResult => | ||
val (vertex, canonical) = addResult | ||
val (blobV, canonicalV) = (addResult.blobVertex, addResult.canonicalVertex) | ||
|
||
val relationshipXors = bundle.relationships.map { | ||
case BlobBundle.Author(author) => | ||
addPerson(graph, author, rawMetadata).flatMap { result => | ||
val (_, authorCanonical) = result | ||
defineAuthorship(vertex, authorCanonical) | ||
defineAuthorship(blobV, result.canonicalVertex) | ||
} | ||
} | ||
|
||
val canonical = canonicalV.toCC[Canonical] | ||
relationshipXors.foldLeft(Xor.right[GraphError, Canonical](canonical)) { | ||
case (Xor.Left(err), _) => Xor.left(err) | ||
case (_, Xor.Left(err)) => Xor.left(err) | ||
|
@@ -46,7 +49,8 @@ object Ingress { | |
} | ||
|
||
|
||
def attachRawMetadata(blobV: Vertex, raw: RawMetadataBlob): Unit = { | ||
def attachRawMetadata(blobV: Vertex, raw: RawMetadataBlob): Unit = | ||
withTransaction(blobV.graph) { | ||
val graph = blobV.graph | ||
|
||
// add the raw metadata to the graph if it doesn't already exist | ||
|
@@ -55,68 +59,82 @@ object Ingress { | |
.getOrElse(graph + raw) | ||
|
||
// check if there's already an edge from the blob vertex to the raw metadata vertex | ||
val existingBlobs = rawV.lift.in(TranslatedFrom).toSet | ||
val existingBlobs: Set[Vertex] = try { | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
yusefnapora
Author
Collaborator
|
||
rawV.in(TranslatedFrom).toSet | ||
This comment has been minimized.
Sorry, something went wrong.
parkan
Collaborator
|
||
} catch { | ||
case _: OStorageException => Set() | ||
case t: Throwable => throw t | ||
} | ||
|
||
if (!existingBlobs.contains(blobV)) { | ||
blobV --- TranslatedFrom --> rawV | ||
} | ||
} | ||
|
||
def defineAuthorship(blobV: Vertex, authorCanonical: Canonical): | ||
Xor[AuthorNotFoundError, Unit] = { | ||
authorCanonical.vertex(blobV.graph).map { authorCanonicalV => | ||
val existingAuthor = Traversals.getAuthor(blobV.lift).headOption | ||
|
||
if (!existingAuthor.contains(authorCanonicalV)) { | ||
blobV --- AuthoredBy --> authorCanonicalV | ||
def defineAuthorship(blobV: Vertex, authorCanonicalVertex: Vertex): | ||
Xor[GraphError, Unit] = { | ||
val authorshipAlreadyDefined: Xor[GraphError, Boolean] = | ||
try { | ||
blobV.lift.findAuthorXor.map { authorCanonical: Canonical => | ||
authorCanonical.canonicalID == | ||
authorCanonicalVertex.toCC[Canonical].canonicalID | ||
} | ||
|
||
Xor.right({}) | ||
}.getOrElse { | ||
Xor.left(AuthorNotFoundError(blobV)) | ||
} catch { | ||
case _: OStorageException => Xor.right(false) | ||
case t: Throwable => throw t | ||
} | ||
} | ||
|
||
// throws? | ||
def addPerson(graph: Graph, person: Person, rawMetadata: Option[RawMetadataBlob] = None): | ||
Xor[GraphError, (Vertex, Canonical)] = { | ||
// If there's an exact match already, return it, | ||
// otherwise create a new Person vertex and canonical | ||
// and return the canonical | ||
val q = Traversals.personBlobsWithExactMatch(graph.V, person) | ||
authorshipAlreadyDefined.map { defined => | ||
if (!defined) { | ||
blobV --- AuthoredBy --> authorCanonicalVertex | ||
} | ||
} | ||
} | ||
|
||
val personV: Vertex = q.headOption.getOrElse(graph + person) | ||
|
||
rawMetadata.foreach(attachRawMetadata(personV, _)) | ||
def addMetadataBlob[T <: MetadataBlob with Product : Marshallable] | ||
This comment has been minimized.
Sorry, something went wrong.
parkan
Collaborator
|
||
(graph: Graph, blob: T, rawMetadataOpt: Option[RawMetadataBlob] = None): | ||
Xor[GraphError, BlobAddResult] = withTransactionXor(graph) { | ||
val existingVertex: Option[Vertex] = blob match { | ||
case image: ImageBlob => | ||
Traversals.imageBlobsWithExactMatch(graph.V, image).headOption | ||
case person: Person => | ||
Traversals.personBlobsWithExactMatch(graph.V, person).headOption | ||
case _ => None | ||
} | ||
|
||
val canonical = graph.V(personV.id) | ||
.findCanonicalXor | ||
.getOrElse { | ||
val resultXor: Xor[GraphError, (Vertex, Vertex)] = | ||
existingVertex.map { v => | ||
Xor.fromOption( | ||
Traversals.getCanonical(v.lift).headOption, | ||
CanonicalNotFound()) | ||
.map(canonicalV => (v, canonicalV)) | ||
}.getOrElse { | ||
val v = graph + blob | ||
val canonicalV = graph + Canonical.create() | ||
canonicalV --- DescribedBy --> personV | ||
canonicalV.toCC[Canonical] | ||
canonicalV --- DescribedBy --> v | ||
Xor.right((v, canonicalV)) | ||
} | ||
|
||
rawMetadataOpt.foreach { raw => | ||
resultXor.foreach { res => | ||
attachRawMetadata(res._1, raw) | ||
} | ||
} | ||
|
||
Xor.right((personV, canonical)) | ||
resultXor.map { res => BlobAddResult(res._1, res._2) } | ||
This comment has been minimized.
Sorry, something went wrong.
parkan
Collaborator
|
||
} | ||
|
||
def addImageBlob(graph: Graph, image: ImageBlob, rawMetadata: Option[RawMetadataBlob] = None): | ||
Xor[GraphError, (Vertex, Canonical)] = { | ||
|
||
val imageV = Traversals.imageBlobsWithExactMatch(graph.V, image) | ||
.headOption.getOrElse(graph + image) | ||
|
||
rawMetadata.foreach(attachRawMetadata(imageV, _)) | ||
def addPerson(graph: Graph, person: Person, rawMetadata: Option[RawMetadataBlob] = None): | ||
Xor[GraphError, BlobAddResult] = | ||
addMetadataBlob(graph, person, rawMetadata) | ||
|
||
val canonical = graph.V(imageV.id) | ||
.findCanonicalXor | ||
.getOrElse { | ||
val canonicalVertex = graph + Canonical.create | ||
canonicalVertex --- DescribedBy --> imageV | ||
canonicalVertex.toCC[Canonical] | ||
} | ||
|
||
Xor.right((imageV, canonical)) | ||
} | ||
def addImageBlob(graph: Graph, image: ImageBlob, rawMetadata: Option[RawMetadataBlob] = None): | ||
Xor[GraphError, BlobAddResult] = | ||
addMetadataBlob(graph, image, rawMetadata) | ||
|
||
|
||
def modifyImageBlob(graph: Graph, parentVertex: Vertex, photo: ImageBlob, raw: Option[RawMetadataBlob] = None): | ||
|
Ok, so let me see if I'm getting this right:
.in(...
will throwOStorageException
-- does this mean that if I runattachRawMetadata
twice with the sameblobV
andraw
in the same transaction it will attach twice?It seems like we are doing work that the db should be doing for us. According to this issue there should already be an SQL-land upsert for edges, is the issue that Gremlin does not provide this facility?