Skip to content

Commit

Permalink
#1693 VersionedModelService divided into V2/V3 - these are now traits.
Browse files Browse the repository at this point in the history
  • Loading branch information
dk1844 committed May 16, 2022
1 parent 4574b09 commit fe56a6c
Show file tree
Hide file tree
Showing 18 changed files with 170 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ abstract class VersionedModelController[C <: VersionedModel with Product with Au
@ResponseStatus(HttpStatus.CREATED)
def importSingleEntity(@AuthenticationPrincipal principal: UserDetails,
@RequestBody importObject: ExportableObject[C]): CompletableFuture[C] = {
versionedModelService.importSingleItemV2(importObject.item, principal.getUsername, importObject.metadata).map {
case Some(entity) => entity
versionedModelService.importSingleItem(importObject.item, principal.getUsername, importObject.metadata).map {
case Some((entity, validation)) => entity // validation is disregarded for V2
case None => throw notFound()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import za.co.absa.enceladus.rest_api.controllers.v3.VersionedModelControllerV3.L
import za.co.absa.enceladus.rest_api.exceptions.{EntityDisabledException, NotFoundException, ValidationException}
import za.co.absa.enceladus.rest_api.models.rest.DisabledPayload
import za.co.absa.enceladus.rest_api.services.VersionedModelService
import za.co.absa.enceladus.rest_api.services.v3.VersionedModelServiceV3

import java.net.URI
import java.util.Optional
Expand All @@ -42,7 +43,7 @@ object VersionedModelControllerV3 {
}

abstract class VersionedModelControllerV3[C <: VersionedModel with Product
with Auditable[C]](versionedModelService: VersionedModelService[C]) extends BaseController {
with Auditable[C]](versionedModelService: VersionedModelServiceV3[C]) extends BaseController {

import za.co.absa.enceladus.rest_api.utils.implicits._

Expand Down Expand Up @@ -110,7 +111,7 @@ abstract class VersionedModelControllerV3[C <: VersionedModel with Product
if (name != importObject.item.name) {
Future.failed(new IllegalArgumentException(s"URL and payload entity name mismatch: '$name' != '${importObject.item.name}'"))
} else {
versionedModelService.importSingleItemV3(importObject.item, principal.getUsername, importObject.metadata).map {
versionedModelService.importSingleItem(importObject.item, principal.getUsername, importObject.metadata).map {
case Some((entity, validation)) =>
// stripping two last segments, instead of /api-v3/dastasets/dsName/import + /dsName/dsVersion we want /api-v3/dastasets + /dsName/dsVersion
createdWithNameVersionLocationBuilder(entity.name, entity.version, request, stripLastSegments = 2).body(validation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import scala.concurrent.Future
import za.co.absa.enceladus.rest_api.exceptions.NotFoundException

@Service
class AttachmentService @Autowired()(attachmentMongoRepository: AttachmentMongoRepository,
class AttachmentService @Autowired()(val mongoRepository: AttachmentMongoRepository,
schemaMongoRepository: SchemaMongoRepository,
datasetMongoRepository: DatasetMongoRepository,
mappingTableMongoRepository: MappingTableMongoRepository)
extends ModelService(attachmentMongoRepository) {
extends ModelService[MenasAttachment] {

protected val attachmentMongoRepository: AttachmentMongoRepository = mongoRepository // alias

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ import scala.util.{Failure, Success}


@Service
class DatasetService @Autowired()(datasetMongoRepository: DatasetMongoRepository,
class DatasetService @Autowired()(val mongoRepository: DatasetMongoRepository,
oozieRepository: OozieRepository,
propertyDefinitionService: PropertyDefinitionService)
extends VersionedModelService(datasetMongoRepository) {
extends VersionedModelService[Dataset] {

protected val datasetMongoRepository: DatasetMongoRepository = mongoRepository // alias

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import za.co.absa.enceladus.rest_api.repositories.{DatasetMongoRepository, Mappi
import scala.concurrent.Future

@Service
class MappingTableService @Autowired() (mappingTableMongoRepository: MappingTableMongoRepository,
datasetMongoRepository: DatasetMongoRepository) extends VersionedModelService(mappingTableMongoRepository) {
class MappingTableService @Autowired() (val mongoRepository: MappingTableMongoRepository,
datasetMongoRepository: DatasetMongoRepository) extends VersionedModelService[MappingTable] {

protected val mappingTableMongoRepository: MappingTableMongoRepository = mongoRepository // alias

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import za.co.absa.enceladus.rest_api.repositories.MongoRepository

import scala.concurrent.Future

abstract class ModelService[C](mongoRepository: MongoRepository[C]) {
trait ModelService[C] {

def mongoRepository: MongoRepository[C]

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ package za.co.absa.enceladus.rest_api.services

import org.springframework.beans.factory.annotation.{Autowired, Value}
import org.springframework.stereotype.Service
import za.co.absa.enceladus.model.Run
import za.co.absa.enceladus.rest_api.repositories.MonitoringMongoRepository

import scala.concurrent.Future

@Service
class MonitoringService @Autowired()(monitoringMongoRepository: MonitoringMongoRepository)
extends ModelService(monitoringMongoRepository) {
class MonitoringService @Autowired()(val mongoRepository: MonitoringMongoRepository)
extends ModelService[Run] {

import scala.concurrent.ExecutionContext.Implicits.global

def getMonitoringDataPoints(datasetName: String, startDate: String, endDate: String): Future[String] = {
monitoringMongoRepository.getMonitoringDataPoints(datasetName, startDate, endDate).map(_.mkString("[", ",", "]"))
mongoRepository.getMonitoringDataPoints(datasetName, startDate, endDate).map(_.mkString("[", ",", "]"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import za.co.absa.enceladus.model.properties.PropertyDefinition
import scala.concurrent.Future

@Service("propertyDefinitionService") // by-name qualifier: V2 implementations use the base implementation, not v3
class PropertyDefinitionService @Autowired()(propertyDefMongoRepository: PropertyDefinitionMongoRepository)
extends VersionedModelService(propertyDefMongoRepository) {
class PropertyDefinitionService @Autowired()(val mongoRepository: PropertyDefinitionMongoRepository)
extends VersionedModelService[PropertyDefinition] {

protected val propertyDefMongoRepository: PropertyDefinitionMongoRepository = mongoRepository // alias
import scala.concurrent.ExecutionContext.Implicits.global

override def getUsedIn(name: String, version: Option[Int]): Future[UsedIn] = {
Expand All @@ -44,7 +45,7 @@ class PropertyDefinitionService @Autowired()(propertyDefMongoRepository: Propert
}

def getDistinctCount(): Future[Int] = {
propertyDefMongoRepository.distinctCount()
mongoRepository.distinctCount()
}

override def create(newPropertyDef: PropertyDefinition, username: String): Future[Option[(PropertyDefinition, Validation)]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

@Service
class RunService @Autowired()(runMongoRepository: RunMongoRepository)
extends ModelService(runMongoRepository) {
class RunService @Autowired()(val mongoRepository: RunMongoRepository)
extends ModelService[Run] {

protected val runMongoRepository: RunMongoRepository = mongoRepository // alias

def getRunSummariesPerDatasetName(): Future[Seq[RunDatasetNameGroupedSummary]] = {
runMongoRepository.getRunSummariesPerDatasetName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor
import scala.concurrent.Future

@Service
class SchemaService @Autowired() (schemaMongoRepository: SchemaMongoRepository,
class SchemaService @Autowired() (val mongoRepository: SchemaMongoRepository,
mappingTableMongoRepository: MappingTableMongoRepository,
datasetMongoRepository: DatasetMongoRepository,
sparkMenasConvertor: SparkMenasSchemaConvertor) extends VersionedModelService(schemaMongoRepository) {
sparkMenasConvertor: SparkMenasSchemaConvertor) extends VersionedModelService[Schema] {

protected val schemaMongoRepository: SchemaMongoRepository = mongoRepository // alias

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.slf4j.LoggerFactory
import org.springframework.security.core.context.SecurityContextHolder
import org.springframework.security.core.userdetails.UserDetails
import za.co.absa.enceladus.model.{ModelVersion, Schema, UsedIn, Validation}
import za.co.absa.enceladus.model.menas._
import za.co.absa.enceladus.model.versionedModel.{VersionedModel, VersionedSummary}
import za.co.absa.enceladus.rest_api.exceptions._
import za.co.absa.enceladus.rest_api.repositories.VersionedMongoRepository
Expand All @@ -31,53 +30,51 @@ import com.mongodb.MongoWriteException
import VersionedModelService._

// scalastyle:off number.of.methods
abstract class VersionedModelService[C <: VersionedModel with Product with Auditable[C]]
(versionedMongoRepository: VersionedMongoRepository[C]) extends ModelService(versionedMongoRepository) {
trait VersionedModelService[C <: VersionedModel with Product with Auditable[C]]
extends ModelService[C] {

def mongoRepository: VersionedMongoRepository[C]

import scala.concurrent.ExecutionContext.Implicits.global

private[services] val logger = LoggerFactory.getLogger(this.getClass)

def getLatestVersionsSummarySearch(searchQuery: Option[String]): Future[Seq[VersionedSummary]] = {
versionedMongoRepository.getLatestVersionsSummarySearch(searchQuery)
mongoRepository.getLatestVersionsSummarySearch(searchQuery)
}

def getLatestVersions(): Future[Seq[C]] = {
versionedMongoRepository.getLatestVersions(None)
mongoRepository.getLatestVersions(None)
}

def getSearchSuggestions(): Future[Seq[String]] = {
versionedMongoRepository.getDistinctNamesEnabled()
mongoRepository.getDistinctNamesEnabled()
}

def getVersion(name: String, version: Int): Future[Option[C]] = {
versionedMongoRepository.getVersion(name, version)
mongoRepository.getVersion(name, version)
}

def getAllVersions(name: String): Future[Seq[C]] = {
versionedMongoRepository.getAllVersions(name)
mongoRepository.getAllVersions(name)
}

def getLatestVersion(name: String): Future[Option[C]] = {
versionedMongoRepository.getLatestVersionValue(name).flatMap({
mongoRepository.getLatestVersionValue(name).flatMap({
case Some(version) => getVersion(name, version)
case _ => throw NotFoundException()
})
}

def getLatestVersionNumber(name: String): Future[Int] = {
versionedMongoRepository.getLatestVersionValue(name).flatMap({
mongoRepository.getLatestVersionValue(name).flatMap({
case Some(version) => Future(version)
case _ => throw NotFoundException()
})
}

def getLatestVersionValue(name: String): Future[Option[Int]] = {
versionedMongoRepository.getLatestVersionValue(name)
}

def getLatestVersionSummary(name: String): Future[Option[VersionedSummary]] = {
versionedMongoRepository.getLatestVersionSummary(name)
mongoRepository.getLatestVersionValue(name)
}

def exportSingleItem(name: String, version: Int): Future[String] = {
Expand All @@ -95,7 +92,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
}

// v2 has external validate validation applied only to imports (not create/edits) via validateSingleImport
def importSingleItemV2(item: C, username: String, metadata: Map[String, String]): Future[Option[C]] = {
def importSingleItem(item: C, username: String, metadata: Map[String, String]): Future[Option[(C, Validation)]] = {
for {
validation <- validateSingleImport(item, metadata)
result <- {
Expand All @@ -105,12 +102,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
throw ValidationException(validation)
}
}
} yield result.map(_._1) // v disregards internal common update-based validation
}

// v3 has internal validation on importItem (because it is based on update
def importSingleItemV3(item: C, username: String, metadata: Map[String, String]): Future[Option[(C, Validation)]] = {
importItem(item, username)
} yield result
}

private[services] def validateSingleImport(item: C, metadata: Map[String, String]): Future[Validation] = {
Expand Down Expand Up @@ -152,7 +144,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
for {
versions <- {
//store all in version ascending order
val all = versionedMongoRepository.getAllVersions(name, inclDisabled = true).map(_.sortBy(_.version))
val all = mongoRepository.getAllVersions(name, inclDisabled = true).map(_.sortBy(_.version))
//get those relevant to us
if (fromVersion.isDefined) {
all.map(_.filter(_.version <= fromVersion.get))
Expand Down Expand Up @@ -197,10 +189,6 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit

def getUsedIn(name: String, version: Option[Int]): Future[UsedIn]

private[rest_api] def getMenasRef(item: C): MenasReference = {
MenasReference(Some(versionedMongoRepository.collectionBaseName), item.name, item.version)
}

private[rest_api] def create(item: C, username: String): Future[Option[(C, Validation)]] = {
// individual validations are deliberately not run in parallel - the latter may not be needed if the former fails
for {
Expand All @@ -209,7 +197,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
creationValidation <- validateForCreation(item)
} yield generalValidation.merge(creationValidation)
_ <- if (validation.isValid) {
versionedMongoRepository.create(item, username)
mongoRepository.create(item, username)
.recover {
case e: MongoWriteException =>
throw ValidationException(Validation().withError("name", s"entity with name already exists: '${item.name}'"))
Expand Down Expand Up @@ -247,7 +235,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
throw ValidationException(validation)
}
}
update <- versionedMongoRepository.update(username, transformed)
update <- mongoRepository.update(username, transformed)
.recover {
case e: MongoWriteException =>
throw ValidationException(Validation().withError("version", s"entity '$itemName' with this version already exists: ${itemVersion + 1}"))
Expand All @@ -263,21 +251,6 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
}
}

def findRefEqual(refNameCol: String, refVersionCol: String, name: String, version: Option[Int]): Future[Seq[MenasReference]] = {
versionedMongoRepository.findRefEqual(refNameCol, refVersionCol, name, version)
}

/**
* Enables all versions of the entity by name.
* @param name
*/
def enableEntity(name: String): Future[UpdateResult] = {
val auth = SecurityContextHolder.getContext.getAuthentication
val principal = auth.getPrincipal.asInstanceOf[UserDetails]

versionedMongoRepository.enableAllVersions(name, principal.getUsername)
}

def disableVersion(name: String, version: Option[Int]): Future[UpdateResult] = {
val auth = SecurityContextHolder.getContext.getAuthentication
val principal = auth.getPrincipal.asInstanceOf[UserDetails]
Expand All @@ -291,29 +264,12 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
if (usedIn.nonEmpty) {
throw EntityInUseException(usedIn)
} else {
versionedMongoRepository.disableVersion(name, version, principal.getUsername)
mongoRepository.disableVersion(name, version, principal.getUsername)
}
}

def isDisabled(name: String): Future[Boolean] = {
versionedMongoRepository.isDisabled(name)
}

/**
* Retrieves model@version and calls
* [[za.co.absa.enceladus.rest_api.services.VersionedModelService#validate(java.lang.Object)]]
*
* In order to extend this behavior, override the mentioned method instead. (that's why this is `final`)
*
* @param name
* @param version
* @return
*/
final def validate(name: String, version: Int): Future[Validation] = {
getVersion(name, version).flatMap({
case Some(entity) => validate(entity)
case _ => Future.failed(NotFoundException(s"Entity by name=$name, version=$version is not found!"))
})
mongoRepository.isDisabled(name)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DatasetServiceV3 @Autowired()(datasetMongoRepository: DatasetMongoReposito
mappingTableService: MappingTableServiceV3,
val schemaService: SchemaServiceV3)
extends DatasetService(datasetMongoRepository, oozieRepository, propertyDefinitionService)
with HavingSchemaService {
with HavingSchemaService with VersionedModelServiceV3[Dataset] {

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.concurrent.Future
class MappingTableServiceV3 @Autowired()(mappingTableMongoRepository: MappingTableMongoRepository,
datasetMongoRepository: DatasetMongoRepository,
val schemaService: SchemaServiceV3)
extends MappingTableService(mappingTableMongoRepository, datasetMongoRepository) with HavingSchemaService {
extends MappingTableService(mappingTableMongoRepository, datasetMongoRepository) with HavingSchemaService
with VersionedModelServiceV3[MappingTable] {

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.Future
@Service
class PropertyDefinitionServiceV3 @Autowired()(propertyDefMongoRepository: PropertyDefinitionMongoRepository,
datasetMongoRepository: DatasetMongoRepository)
extends PropertyDefinitionService(propertyDefMongoRepository) {
extends PropertyDefinitionService(propertyDefMongoRepository) with VersionedModelServiceV3[PropertyDefinition] {

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class SchemaServiceV3 @Autowired()(schemaMongoRepository: SchemaMongoRepository,
mappingTableMongoRepository: MappingTableMongoRepository,
datasetMongoRepository: DatasetMongoRepository,
sparkMenasConvertor: SparkMenasSchemaConvertor)
extends SchemaService(schemaMongoRepository, mappingTableMongoRepository, datasetMongoRepository, sparkMenasConvertor) {
extends SchemaService(schemaMongoRepository, mappingTableMongoRepository, datasetMongoRepository, sparkMenasConvertor)
with VersionedModelServiceV3[Schema]{

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down
Loading

0 comments on commit fe56a6c

Please sign in to comment.