Skip to content

Commit

Permalink
spline #1155 Move /execution-plans/:planId/data-sources?access=...
Browse files Browse the repository at this point in the history
…handler to Foxx
  • Loading branch information
wajda committed Sep 2, 2024
1 parent 94b343d commit f83a56b
Show file tree
Hide file tree
Showing 20 changed files with 130 additions and 82 deletions.
2 changes: 1 addition & 1 deletion arangodb-foxx-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
<dependency>
<groupId>com.github.wajda</groupId>
<artifactId>scala-ts_${scala.compat.version}</artifactId>
<version>0.4.1.10</version>
<version>0.4.1.11</version>
</dependency>
</dependencies>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package za.co.absa.spline.consumer.service.model

import za.co.absa.commons.reflect.EnumerationMacros

sealed abstract class DataSourceActionType(val name: String)
sealed abstract class DataSourceActionType(val name: String) {
override def toString: String = name
}

object DataSourceActionType {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 ABSA Group Limited
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { CollectionName, ReadTxInfo, TxAwareDocument, WriteTxInfo } from '../persistence/model'
import { CollectionName, ReadTxInfo, TxAwareDocument, WriteTxInfo } from './model'
import { aql, db } from '@arangodb'
import { DocumentKey } from '../model'
import { TxManager } from './txm'
Expand Down Expand Up @@ -58,7 +58,7 @@ function deleteByKey(colName: CollectionName, key: DocumentKey): DocumentMetadat
return db._remove({ _id: `${colName}/${key}` }, { silent: true })
}

export function checkKeyExistence(colName: CollectionName, key: string, discriminator: string = null, rtxInfo: ReadTxInfo = null): boolean {
export function checkKeyExistence(colName: CollectionName, key: DocumentKey, discriminator: string = null, rtxInfo: ReadTxInfo = null): boolean {
const aqlGen = new AQLCodeGenHelper(rtxInfo)

const docDiscriminatorCursor: ArangoDB.Cursor<string> = db._query(aql`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { TxManager } from './tx-manager'
import events from 'events'
import { ReadTxInfo, TxAwareDocument, TxEvent, TxId, TxParams, WriteTxInfo } from '../../persistence/model'
import { ReadTxInfo, TxAwareDocument, TxEvent, TxId, TxParams, WriteTxInfo } from '../model'
import * as Logger from '../../utils/logger'


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
TxNum,
TxParams,
WriteTxInfo
} from '../../persistence/model'
} from '../model'
import { store } from '../store'
import { aql, db } from '@arangodb'
import * as Logger from '../../utils/logger'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { ReadTxInfo, TxAwareDocument, TxId, TxParams, WriteTxInfo } from '../../persistence/model'
import { ReadTxInfo, TxAwareDocument, TxId, TxParams, WriteTxInfo } from '../model'


export interface TxManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { TxParams, WriteTxInfo } from '../../persistence/model'
import { TxParams, WriteTxInfo } from '../model'
import { TxManager } from './index'


Expand Down
22 changes: 9 additions & 13 deletions arangodb-foxx-services/src/main/routes/events-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import Joi from 'joi'
import { lineageOverview } from '../services/lineage-overview'
import { impactOverview } from '../services/impact-overview'
import { Progress } from '../../external/api.model'
import { listExecutionEventInfo_groupedByDataSource, listExecutionEvents, storeExecutionEvent } from '../services/execution-event-store'
import { TxManager } from '../services/txm'
import { checkKeyExistence } from '../services/store'
import { NodeCollectionName } from '../persistence/model'
import * as Logger from '../utils/logger'
import {
checkExecutionEventExists,
listExecutionEventInfo_groupedByDataSource,
listExecutionEvents,
storeExecutionEvent
} from '../services/execution-event-store'


export const eventsRouter: Foxx.Router = createRouter()
Expand All @@ -47,7 +48,6 @@ eventsRouter
eventsRouter
.get('/',
(req: Foxx.Request, res: Foxx.Response) => {
Logger.debug(`Foxx: GET ${req.url}`)
const events = listExecutionEvents(
req.queryParams.asAtTime,
req.queryParams.timestampStart,
Expand Down Expand Up @@ -89,7 +89,6 @@ eventsRouter
eventsRouter
.get('/_grouped-by-ds',
(req: Foxx.Request, res: Foxx.Response) => {
Logger.debug(`Foxx: GET ${req.url}`)
const events = listExecutionEventInfo_groupedByDataSource(
req.queryParams.asAtTime,
req.queryParams.timestampStart,
Expand Down Expand Up @@ -133,8 +132,7 @@ eventsRouter
eventsRouter
.get('/:eventId/_exists',
(req: Foxx.Request, res: Foxx.Response) => {
const exists = checkKeyExistence(
NodeCollectionName.Progress,
const exists = checkExecutionEventExists(
req.pathParams.eventId,
req.queryParams.discriminator
)
Expand All @@ -152,8 +150,7 @@ eventsRouter
(req: Foxx.Request, res: Foxx.Response) => {
const eventKey = req.pathParams.eventKey
const maxDepth = req.pathParams.maxDepth
const rtxInfo = TxManager.startRead()
const overview = lineageOverview(eventKey, maxDepth, rtxInfo)
const overview = lineageOverview(eventKey, maxDepth)
if (overview) {
res.send(overview)
}
Expand All @@ -175,8 +172,7 @@ eventsRouter
(req: Foxx.Request, res: Foxx.Response) => {
const eventKey = req.pathParams.eventKey
const maxDepth = req.pathParams.maxDepth
const rtxInfo = TxManager.startRead()
const overview = impactOverview(eventKey, maxDepth, rtxInfo)
const overview = impactOverview(eventKey, maxDepth)
if (overview) {
res.send(overview)
}
Expand Down
23 changes: 17 additions & 6 deletions arangodb-foxx-services/src/main/routes/plans-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
*/

import { createRouter } from '@arangodb/foxx'
import { ExecutionPlanPersistentModel } from '../../external/api.model'
import { storeExecutionPlan } from '../services/execution-plan-store'
import { DataSourceActionType, ExecutionPlanPersistentModel } from '../../external/api.model'
import { checkExecutionPlanExists, getDataSourceURIsByActionType, storeExecutionPlan } from '../services/execution-plan-store'
import Joi from 'joi'
import { checkKeyExistence } from '../services/store'
import { NodeCollectionName } from '../persistence/model'


export const plansRouter: Foxx.Router = createRouter()
Expand All @@ -37,11 +35,24 @@ plansRouter
.summary('Register a new execution plan')


plansRouter
.get('/:planId/data-sources',
(req: Foxx.Request, res: Foxx.Response) => {
const uris = getDataSourceURIsByActionType(
req.pathParams.planId,
req.queryParams.access
)
res.send(uris)
})
.pathParam('planId', Joi.string().min(1).required(), 'Execution Plan ID')
.queryParam('access', Joi.string().optional().valid(DataSourceActionType.values).default(null), 'Access type (read/write) to filter by')
.response(200, ['application/json'], 'Array of data source URIs')


plansRouter
.get('/:planId/_exists',
(req: Foxx.Request, res: Foxx.Response) => {
const exists = checkKeyExistence(
NodeCollectionName.ExecutionPlan,
const exists = checkExecutionPlanExists(
req.pathParams.planId,
req.queryParams.discriminator
)
Expand Down
17 changes: 13 additions & 4 deletions arangodb-foxx-services/src/main/services/execution-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@


import { ExecPlanDetails, ExecutionEventInfo, Frame, Label, Progress } from '../../external/api.model'
import { CollectionName, edge, ViewName, WriteTxInfo } from '../persistence/model'
import { store } from './store'
import { CollectionName, edge, NodeCollectionName, ViewName, WriteTxInfo } from '../persistence/model'
import { checkKeyExistence, store } from '../persistence/store'
import { aql, db } from '@arangodb'
import { withTimeTracking } from '../utils/common'
import { TxManager } from './txm'
import { TxTemplate } from './txm/tx-template'
import { TxManager } from '../persistence/txm'
import { TxTemplate } from '../persistence/txm/tx-template'
import * as Logger from '../utils/logger'
import { DocumentKey } from '../model'


const EVENT_SEARCH_FIELDS = [
Expand Down Expand Up @@ -198,6 +199,14 @@ export function listExecutionEventInfo_groupedByDataSource(
}
}

export function checkExecutionEventExists(eventKey: DocumentKey, discriminator: string): boolean {
return checkKeyExistence(
NodeCollectionName.Progress,
eventKey,
discriminator
)
}

export function storeExecutionEvent(progress: Progress): void {
withTimeTracking(`STORE EVENT ${progress._key}`, () => {
const planKey = progress.planKey
Expand Down
44 changes: 40 additions & 4 deletions arangodb-foxx-services/src/main/services/execution-plan-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@
*/


import { ExecutionPlanPersistentModel } from '../../external/api.model'
import { CollectionName, WriteTxInfo } from '../persistence/model'
import { store } from './store'
import { DataSourceActionType, ExecutionPlanPersistentModel } from '../../external/api.model'
import { CollectionName, EdgeCollectionName, NodeCollectionName, WriteTxInfo } from '../persistence/model'
import { checkKeyExistence, store } from '../persistence/store'
import { withTimeTracking } from '../utils/common'
import { TxTemplate } from './txm/tx-template'
import { TxTemplate } from '../persistence/txm/tx-template'
import { DocumentKey } from '../model'
import { DataSourceActionTypeValue } from './model'
import { aql, db } from '@arangodb'
import { AQLCodeGenHelper } from '../utils/aql-gen-helper'
import { TxManager } from '../persistence/txm'


export function checkExecutionPlanExists(planKey: DocumentKey, discriminator: string): boolean {
return checkKeyExistence(
NodeCollectionName.ExecutionPlan,
planKey,
discriminator
)
}

export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
const execPlanKey = eppm.executionPlan._key
withTimeTracking(`STORE PLAN ${execPlanKey}`, () => {
Expand Down Expand Up @@ -65,3 +78,26 @@ export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
})
}

export function getDataSourceURIsByActionType(planKey: DocumentKey, access: DataSourceActionTypeValue): string[] {
const rtxInfo = TxManager.startRead()
const aqlGen = new AQLCodeGenHelper(rtxInfo)

let edges: EdgeCollectionName[]
if (access === DataSourceActionType.Read.name) {
edges = [EdgeCollectionName.Depends]
}
else if (access === DataSourceActionType.Write.name) {
edges = [EdgeCollectionName.Affects]
}
else {
edges = [EdgeCollectionName.Depends, EdgeCollectionName.Affects]
}

return db._query(aql`
WITH ${aql.literal([...edges, NodeCollectionName.DataSource].join(', '))}
FOR ds IN 1..1
OUTBOUND DOCUMENT('executionPlan', ${planKey}) ${aql.literal(edges.join(', '))}
${aqlGen.genTxIsolationCodeForTraversal('ds')}
RETURN ds.uri
`).toArray()
}
5 changes: 3 additions & 2 deletions arangodb-foxx-services/src/main/services/impact-overview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
} from './commons'
import { Progress } from '../../external/api.model'
import { ReadTxInfo } from '../persistence/model'
import { TxManager } from '../persistence/txm'


/**
Expand All @@ -33,10 +34,10 @@ import { ReadTxInfo } from '../persistence/model'
* @param eventKey read event key
* @param maxDepth maximum number of job nodes in any path of the resulted graph (excluding cycles).
* It shows how far the traversal should look for the impact (forward-lineage).
* @param rtxInfo read tx info
* @returns za.co.absa.spline.consumer.service.model.LineageOverview
*/
export function impactOverview(eventKey: DocumentKey, maxDepth: number, rtxInfo: ReadTxInfo): LineageOverview {
export function impactOverview(eventKey: DocumentKey, maxDepth: number): LineageOverview {
const rtxInfo = TxManager.startRead()

const executionEvent: Progress = getExecutionEventFromEventKey(eventKey, rtxInfo)
const targetDataSource: DataSource = executionEvent && getTargetDataSourceFromExecutionEvent(executionEvent, rtxInfo)
Expand Down
5 changes: 3 additions & 2 deletions arangodb-foxx-services/src/main/services/lineage-overview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
} from './commons'
import { Progress } from '../../external/api.model'
import { ReadTxInfo } from '../persistence/model'
import { TxManager } from '../persistence/txm'


/**
Expand All @@ -33,10 +34,10 @@ import { ReadTxInfo } from '../persistence/model'
* @param eventKey write event key
* @param maxDepth maximum number of job nodes in any path of the resulted graph (excluding cycles).
* It shows how far the traversal should look for the lineage.
* @param rtxInfo read tx info
* @returns za.co.absa.spline.consumer.service.model.LineageOverview
*/
export function lineageOverview(eventKey: DocumentKey, maxDepth: number, rtxInfo: ReadTxInfo): LineageOverview {
export function lineageOverview(eventKey: DocumentKey, maxDepth: number): LineageOverview {
const rtxInfo = TxManager.startRead()

const executionEvent: Progress = getExecutionEventFromEventKey(eventKey, rtxInfo)
const targetDataSource: DataSource = executionEvent && getTargetDataSourceFromExecutionEvent(executionEvent, rtxInfo)
Expand Down
20 changes: 20 additions & 0 deletions arangodb-foxx-services/src/main/services/model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { DataSourceActionType } from '../../external/api.model'


export type DataSourceActionTypeValue = typeof DataSourceActionType.values[number]
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { TxManager } from '../../main/services/txm'
import { TxManager } from '../../main/persistence/txm'
import { TxEvent, WriteTxInfo } from '../../main/persistence/model'
import * as Logger from '../../main/utils/logger'
import { aql, db } from '@arangodb'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { ReadTxInfo, TxAwareDocument, TxEvent, TxId, TxParams, WriteTxInfo } from '../../../../src/main/persistence/model'
import { SubscribableTxManagerDecorator } from '../../../../src/main/services/txm/subcribable-tx-manager-decorator'
import { SubscribableTxManagerDecorator } from '../../../../src/main/persistence/txm/subcribable-tx-manager-decorator'


const mockTxManagerImpl = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package za.co.absa.spline.consumer.service.repo

import com.fasterxml.jackson.core.`type`.TypeReference
import za.co.absa.spline.consumer.service.model._

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -27,3 +28,8 @@ trait DataSourceRepository extends AbstractExecutionEventRepository {
(implicit ec: ExecutionContext): Future[Array[String]]

}

object DataSourceRepository {
implicit val typeRefArrayOfString: TypeReference[Array[String]] = new TypeReference[Array[String]] {}

}
Loading

0 comments on commit f83a56b

Please sign in to comment.