Skip to content

Commit

Permalink
spline #684 Call Foxx service
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Aug 20, 2020
1 parent 38fc8a9 commit 4b6c3a5
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,27 @@
*/
package za.co.absa.spline.consumer.service.repo

import java.util.concurrent.CompletionException

import com.arangodb.ArangoDBException
import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.internal.ArangoDatabaseImplicits.InternalArangoDatabaseOps
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository
import za.co.absa.spline.consumer.service.model.LineageOverview

import scala.PartialFunction.cond
import scala.concurrent.{ExecutionContext, Future}

@Repository
class LineageRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends LineageRepository {

import za.co.absa.spline.persistence.ArangoImplicits._

override def lineageOverviewForExecutionEvent(eventId: String, maxDepth: Int)(implicit ec: ExecutionContext): Future[LineageOverview] = db
.queryOne[LineageOverview](
"""
|WITH progress, progressOf, executionPlan, affects, dataSource
|LET executionEvent = FIRST(FOR p IN progress FILTER p._key == @eventId RETURN p)
|LET targetDataSource = FIRST(FOR ds IN 2 OUTBOUND executionEvent progressOf, affects RETURN ds)
|LET lineageGraph = SPLINE::EVENT_LINEAGE_OVERVIEW(executionEvent, @maxDepth)
|
|RETURN lineageGraph && {
| "info": {
| "timestamp" : executionEvent.timestamp,
| "applicationId" : executionEvent.extra.appId,
| "targetDataSourceId": targetDataSource._key
| },
| "graph": {
| "depthRequested": @maxDepth,
| "depthComputed": lineageGraph.depth || -1,
| "nodes": lineageGraph.vertices,
| "edges": lineageGraph.edges
| }
|}
|""".stripMargin,
Map(
"eventId" -> eventId,
"maxDepth" -> (maxDepth: Integer))
)
.filter(null.!=)
override def lineageOverviewForExecutionEvent(eventId: String, maxDepth: Int)(implicit ec: ExecutionContext): Future[LineageOverview] =
db
.foxxGet[LineageOverview](s"/spline/events/$eventId/lineage-overview/$maxDepth")
.recover({
case ce: CompletionException
if cond(ce.getCause)({ case ae: ArangoDBException => ae.getResponseCode == 404 }) =>
throw new NoSuchElementException(s"Event ID: $eventId")
})
}
2 changes: 1 addition & 1 deletion persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>6.6.3</version>
<version>6.7.3</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
Expand Down
43 changes: 22 additions & 21 deletions persistence/src/main/resources/foxx/spline/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ const eventLineageOverview = (function () {
return null;
}

console.log("startEvent", startEvent);

const startSource = db._query(aql`
FOR ds IN 2 OUTBOUND ${startEvent} progressOf, affects
LIMIT 1
Expand Down Expand Up @@ -218,27 +216,30 @@ router
RETURN FIRST(FOR p IN progress FILTER p._key == ${eventKey} RETURN p)
`).next();

const targetDataSource = db._query(aql`
WITH progress, progressOf, executionPlan, affects, dataSource
RETURN FIRST(FOR ds IN 2 OUTBOUND ${executionEvent} progressOf, affects RETURN ds)
`).next();

const lineageGraph = eventLineageOverview(executionEvent, maxDepth);
const lineageOverview = lineageGraph && {
"info": {
"timestamp": executionEvent.timestamp,
"applicationId": executionEvent.extra.appId,
"targetDataSourceId": targetDataSource._key
},
"graph": {
"depthRequested": maxDepth,
"depthComputed": lineageGraph.depth || -1,
"nodes": lineageGraph.vertices,
"edges": lineageGraph.edges
if (executionEvent) {
const targetDataSource = executionEvent && db._query(aql`
WITH progress, progressOf, executionPlan, affects, dataSource
RETURN FIRST(FOR ds IN 2 OUTBOUND ${executionEvent} progressOf, affects RETURN ds)
`).next();

const lineageGraph = eventLineageOverview(executionEvent, maxDepth);
const lineageOverview = lineageGraph && {
"info": {
"timestamp": executionEvent.timestamp,
"applicationId": executionEvent.extra.appId,
"targetDataSourceId": targetDataSource._key
},
"graph": {
"depthRequested": maxDepth,
"depthComputed": lineageGraph.depth || -1,
"nodes": lineageGraph.vertices,
"edges": lineageGraph.edges
}
}
res.send(lineageOverview);
} else {
res.status(404);
}

res.send(lineageOverview);
})
.pathParam('eventKey', joi.string().min(1).required(), 'Execution Event UUID')
.pathParam('maxDepth', joi.number().integer().min(0).required(), 'Max depth of traversing in terms of [Data Source] -> [Execution Plan] pairs')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.arangodb.async.internal

import java.net.URI
import java.util.Collections

object ArangoDatabaseAsyncImplImplicits {

implicit class ArangoDatabaseAsyncImplOps(val dbImpl: ArangoDatabaseAsyncImpl) extends AnyVal {
def route(path: URI): ArangoRouteAsyncImpl =
new ArangoRouteAsyncImpl(dbImpl, path.toString, Collections.emptyMap[String, String])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.arangodb.internal {


import java.net.URI

import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.async.internal.ArangoExecutorAsync
import com.arangodb.async.internal.velocystream.VstCommunicationAsync
import com.arangodb.async.internal.{ArangoDatabaseAsyncImpl, ArangoExecutorAsync}
import com.arangodb.internal.velocystream.ConnectionParams
import org.apache.commons.io.IOUtils
import org.apache.http.auth.UsernamePasswordCredentials
Expand All @@ -30,17 +32,34 @@ package com.arangodb.internal {
import za.co.absa.commons.lang.ARM.managed
import za.co.absa.commons.reflect.ReflectionUtils

import scala.concurrent.Future
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

/**
* A workaround for the ArangoDB Java Driver issue #353
*
* @see [[https://github.com/arangodb/arangodb-java-driver/issues/353]]
* A set of workarounds for the ArangoDB Java Driver
*/
object InternalArangoDatabaseImplicits {
object ArangoDatabaseImplicits {

implicit class InternalArangoDatabaseOps(val db: ArangoDatabaseAsync) extends AnyVal {

implicit class InternalArangoDatabaseOps(db: ArangoDatabaseAsync) {
/**
* @see [[https://github.com/arangodb/arangodb-java-driver/issues/357]]
*/
def foxxGet[A: ClassTag](path: String)(implicit ec: ExecutionContext): Future[A] = {
import com.arangodb.async.internal.ArangoDatabaseAsyncImplImplicits._
val resType = implicitly[ClassTag[A]].runtimeClass
db.asInstanceOf[ArangoDatabaseAsyncImpl]
.route(new URI(path))
.get()
.toScala
.map(resp =>
db.util().deserialize(resp.getBody, resType): A)
}

/**
* @see [[https://github.com/arangodb/arangodb-java-driver/issues/353]]
*/
def adminExecute(script: String): Future[Unit] = {
val executor = db.asInstanceOf[ArangoExecuteable[_ <: ArangoExecutorAsync]].executor
val dbName = db.name
Expand Down Expand Up @@ -85,38 +104,4 @@ package com.arangodb.internal {

}

package velocystream {

import com.arangodb.internal.net.{AccessType, HostDescription, HostHandler}

import scala.reflect.ClassTag

object ConnectionParams {
def unapply(comm: VstCommunicationAsync): Option[(String, Int, Option[String], Option[String])] = {
val user = comm.user
val password = comm.password
val hostHandler = extractFieldValue[VstCommunication[_, _], HostHandler](comm, "hostHandler")
val host = hostHandler.get(null, AccessType.WRITE)
val hostDescr = ReflectionUtils.extractFieldValue[HostDescription](host, "description")
Some((
hostDescr.getHost,
hostDescr.getPort,
Option(user),
Option(password)
))
}

/**
* Remove when https://github.com/AbsaOSS/commons/issues/28 is fixed
*/
private def extractFieldValue[A: ClassTag, B](o: AnyRef, fieldName: String) = {
val declaringClass = implicitly[ClassTag[A]]
val field = declaringClass.runtimeClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(o).asInstanceOf[B]
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.arangodb.internal.velocystream

import com.arangodb.async.internal.velocystream.VstCommunicationAsync
import com.arangodb.internal.net.{AccessType, HostDescription, HostHandler}
import za.co.absa.commons.reflect.ReflectionUtils

import scala.reflect.ClassTag

object ConnectionParams {
def unapply(comm: VstCommunicationAsync): Option[(String, Int, Option[String], Option[String])] = {
val user = comm.user
val password = comm.password
val hostHandler = extractFieldValue[VstCommunication[_, _], HostHandler](comm, "hostHandler")
val host = hostHandler.get(null, AccessType.WRITE)
val hostDescr = ReflectionUtils.extractFieldValue[HostDescription](host, "description")
Some((
hostDescr.getHost,
hostDescr.getPort,
Option(user),
Option(password)
))
}

/**
* Remove when https://github.com/AbsaOSS/commons/issues/28 is fixed
*/
private def extractFieldValue[A: ClassTag, B](o: AnyRef, fieldName: String) = {
val declaringClass = implicitly[ClassTag[A]]
val field = declaringClass.runtimeClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(o).asInstanceOf[B]
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Migrator(
log.info(s"Upgrading to version: ${version.asString}")
log.trace(s"Applying script: \n$script")

import com.arangodb.internal.InternalArangoDatabaseImplicits._
import com.arangodb.internal.ArangoDatabaseImplicits._

for {
_ <- db.adminExecute(
Expand Down

0 comments on commit 4b6c3a5

Please sign in to comment.