Skip to content

Commit

Permalink
spline #1155 Refactor FoxxTxBuilder to FoxxRouter
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Sep 26, 2024
1 parent 6ff3017 commit 8b391d9
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 88 deletions.
2 changes: 1 addition & 1 deletion arangodb-foxx-services/src/main/services/commons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function eventLineageOverviewGraph(observedByEventFn: (p: Progress, rtxIn
}

const aqlGen = new AQLCodeGenHelper(rtxInfo)
const genTxIsolationCodeForTraversal = memoize((...keys) => keys, aqlGen.genTxIsolationCodeForTraversal)
const genTxIsolationCodeForTraversal = memoize((...keys) => keys, aqlGen.genTxIsolationCodeForTraversal).bind(aqlGen)
const startSource = getStartDataSourceFromExecutionEvent(startEvent, genTxIsolationCodeForTraversal)
const graphBuilder = new GraphBuilder([startSource])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.consumer.service.repo

import com.arangodb.ArangoDBException
import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.internal.util.ArangoSerializationFactory.Serializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository
import za.co.absa.spline.consumer.service.model.LineageOverview
import za.co.absa.spline.consumer.service.model.ExecutionEventInfo.Id
import za.co.absa.spline.consumer.service.model.LineageOverview
import za.co.absa.spline.persistence.FoxxRouter

import java.util.concurrent.CompletionException
import scala.PartialFunction.cond
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters._

@Repository
class ImpactLineageRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends LineageRepository with ImpactRepository {
class ImpactLineageRepositoryImpl @Autowired()(foxxRouter: FoxxRouter) extends LineageRepository with ImpactRepository {

override def lineageOverviewForExecutionEvent(eventId: Id, maxDepth: Int)
(implicit ec: ExecutionContext): Future[LineageOverview] = {
generalLineageOverviewForExecutionEvent(s"/spline/execution-events/$eventId/lineage-overview/$maxDepth", eventId)
override def lineageOverviewForExecutionEvent(eventId: Id, maxDepth: Int)(implicit ec: ExecutionContext): Future[LineageOverview] = {
foxxRouter.get[LineageOverview](s"/spline/execution-events/$eventId/lineage-overview/$maxDepth")
}

override def impactOverviewForExecutionEvent(eventId: Id, maxDepth: Int)
(implicit ec: ExecutionContext): Future[LineageOverview] = {
generalLineageOverviewForExecutionEvent(s"/spline/execution-events/$eventId/impact-overview/$maxDepth", eventId)
override def impactOverviewForExecutionEvent(eventId: Id, maxDepth: Int)(implicit ec: ExecutionContext): Future[LineageOverview] = {
foxxRouter.get[LineageOverview](s"/spline/execution-events/$eventId/impact-overview/$maxDepth")
}

private def generalLineageOverviewForExecutionEvent(routeUrl: String, eventId: Id)(implicit ec: ExecutionContext): Future[LineageOverview] =
db
.route(routeUrl)
.get()
.asScala
.map(resp => db.util(Serializer.CUSTOM).deserialize[LineageOverview](resp.getBody, classOf[LineageOverview]))
.recover({
case ce: CompletionException
if cond(ce.getCause)({ case ae: ArangoDBException => ae.getResponseCode == 404 }) =>
throw new NoSuchElementException(s"Event ID: $eventId")
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class ArangoRepoConfig extends InitializingBean with LazyLogging {

@Bean def arangoDatabase: ArangoDatabaseAsync = arangoDatabaseFacade.db

@Bean def foxxRouter: FoxxRouter = new FoxxRouter(arangoDatabase)

@Bean def databaseVersionManager: DatabaseVersionManager = new DatabaseVersionManager(arangoDatabase)

@Bean def databaseVersionChecker: DatabaseVersionChecker = new DatabaseVersionChecker(databaseVersionManager)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.persistence

import com.arangodb.ArangoDBException
import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.internal.util.ArangoSerializationFactory.Serializer
import org.springframework.beans.factory.annotation.Autowired

import java.util.concurrent.CompletionException
import scala.PartialFunction.cond
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters._
import scala.reflect.ClassTag

class FoxxRouter @Autowired()(db: ArangoDatabaseAsync) {
private val serialization = db.util(Serializer.CUSTOM)

def get[A: ClassTag](endpoint: String)(implicit ex: ExecutionContext): Future[A] = {
val aType = implicitly[ClassTag[A]].runtimeClass
db
.route(endpoint)
.get()
.asScala
.map(resp => serialization.deserialize[A](resp.getBody, aType))
.recover({
case ce: CompletionException
if cond(ce.getCause)({ case ae: ArangoDBException => ae.getResponseCode == 404 }) =>
throw new NoSuchElementException(s"Resource NOT FOUND: $endpoint")
})
}

def post[A: ClassTag](endpoint: String, body: AnyRef)(implicit ex: ExecutionContext): Future[A] = {
val serializedBody = serialization.serialize(body)
db
.route(endpoint)
.withBody(serializedBody)
.post()
.asScala
.asInstanceOf[Future[A]]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ import com.typesafe.scalalogging.LazyLogging
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository
import za.co.absa.spline.common.AsyncCallRetryer
import za.co.absa.spline.persistence.ArangoImplicits
import za.co.absa.spline.persistence.model._
import za.co.absa.spline.persistence.tx._
import za.co.absa.spline.producer.model.v1_2.ExecutionEvent._
import za.co.absa.spline.persistence.{ArangoImplicits, FoxxRouter}
import za.co.absa.spline.producer.model.{v1_2 => apiModel}
import za.co.absa.spline.producer.service.UUIDCollisionDetectedException
import za.co.absa.spline.producer.service.model.{ExecutionEventKeyCreator, ExecutionPlanPersistentModel, ExecutionPlanPersistentModelBuilder}
import za.co.absa.spline.producer.service.model.{ExecutionEventKeyCreator, ExecutionPlanPersistentModelBuilder}

import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal

@Repository
class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, retryer: AsyncCallRetryer) extends ExecutionProducerRepository
class ExecutionProducerRepositoryImpl @Autowired()(
db: ArangoDatabaseAsync,
foxxRouter: FoxxRouter,
retryer: AsyncCallRetryer
) extends ExecutionProducerRepository
with LazyLogging {

import ArangoImplicits._
Expand Down Expand Up @@ -78,8 +80,8 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, retr
case None =>
// no execution plan with the given ID found
val eppm = ExecutionPlanPersistentModelBuilder.toPersistentModel(executionPlan, persistedDSKeyByURI)
val tx = createExecutionPlanTransaction(eppm)
tx.execute[Any](db)

foxxRouter.post("/spline/execution-plans", eppm)
}
} yield ()
})
Expand Down Expand Up @@ -119,8 +121,8 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, retr
planKey = e.planId.toString,
execPlanDetails = null // the value is populated below in the transaction script
)
val tx = createExecutionEventTransaction(p)
tx.execute[Any](db)

foxxRouter.post("/spline/execution-events", p)
}
} yield ()
})
Expand All @@ -140,16 +142,7 @@ class ExecutionProducerRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, retr
}
}

object ExecutionProducerRepositoryImpl {

private def createExecutionPlanTransaction(eppm: ExecutionPlanPersistentModel) = {
new FoxxPostTxBuilder("/spline/execution-plans", eppm).buildTx()
}

private def createExecutionEventTransaction(p: Progress): ArangoTx = {
new FoxxPostTxBuilder("/spline/execution-events", p).buildTx()
}

private object ExecutionProducerRepositoryImpl {
private def ensureNoExecPlanIDCollision(
planId: apiModel.ExecutionPlan.Id,
actualDiscriminator: apiModel.ExecutionPlan.Discriminator,
Expand Down

0 comments on commit 8b391d9

Please sign in to comment.