Skip to content

Commit

Permalink
1868 statistics with missing counts and datasets missing proprties (#…
Browse files Browse the repository at this point in the history
…1873)

* 1868 statistics with missing counts and datasets missing proprties
  • Loading branch information
AdrianOlosutean authored Aug 31, 2021
1 parent a4998ad commit c96f779
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.model.properties

import za.co.absa.enceladus.model.properties.essentiality.Essentiality

case class PropertyDefinitionStats(name: String,
version: Int = 1,
essentiality: Essentiality = Essentiality.Optional,
missingInDatasetsCount: Int = 0)

object PropertyDefinitionStats {
def apply(propertyDefinition: PropertyDefinition, missingCounts: Int): PropertyDefinitionStats = {
PropertyDefinitionStats(propertyDefinition.name, propertyDefinition.version,
propertyDefinition.essentiality, missingCounts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.net.URI
import java.util
import java.util.Optional
import java.util.concurrent.CompletableFuture

import org.slf4j.{Logger, LoggerFactory}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.{HttpStatus, ResponseEntity}
Expand All @@ -29,6 +30,7 @@ import za.co.absa.enceladus.menas.services.DatasetService
import za.co.absa.enceladus.utils.validation.ValidationLevel.ValidationLevel
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.model.properties.PropertyDefinition
import za.co.absa.enceladus.model.versionedModel.VersionedSummary
import za.co.absa.enceladus.model.{Dataset, Validation}
import za.co.absa.enceladus.utils.validation.ValidationLevel.Constants.DefaultValidationLevelName

Expand All @@ -44,6 +46,14 @@ class DatasetController @Autowired()(datasetService: DatasetService)

import scala.concurrent.ExecutionContext.Implicits.global

@GetMapping(Array("/latest"))
@ResponseStatus(HttpStatus.OK)
def getLatestVersions(@RequestParam(value = "missing_property", required = false)
missingProperty: Optional[String]): CompletableFuture[Seq[VersionedSummary]] = {
datasetService.getLatestVersions(missingProperty.toScalaOption)
.map(datasets => datasets.map(dataset => VersionedSummary(dataset.name, dataset.version)))
}

@PostMapping(Array("/{datasetName}/rule/create"))
@ResponseStatus(HttpStatus.OK)
def addConformanceRule(@AuthenticationPrincipal user: UserDetails,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ package za.co.absa.enceladus.menas.controllers
import java.util.concurrent.CompletableFuture

import scala.concurrent.Future

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

import za.co.absa.enceladus.menas.models.LandingPageInformation
import za.co.absa.enceladus.menas.repositories.DatasetMongoRepository
import za.co.absa.enceladus.menas.repositories.LandingPageStatisticsMongoRepository
import za.co.absa.enceladus.menas.repositories.MappingTableMongoRepository
import za.co.absa.enceladus.menas.repositories.SchemaMongoRepository
import za.co.absa.enceladus.menas.services.RunService
import za.co.absa.enceladus.menas.services.{PropertyDefinitionService, RunService, StatisticsService}
import za.co.absa.enceladus.model.properties.essentiality.{Mandatory, Recommended}

@RestController
@RequestMapping(Array("/api/landing"))
class LandingPageController @Autowired() (datasetRepository: DatasetMongoRepository,
mappingTableRepository: MappingTableMongoRepository,
schemaRepository: SchemaMongoRepository,
runsService: RunService,
landingPageRepository: LandingPageStatisticsMongoRepository) extends BaseController {
landingPageRepository: LandingPageStatisticsMongoRepository,
statisticsService: StatisticsService) extends BaseController {

import scala.concurrent.ExecutionContext.Implicits.global
import za.co.absa.enceladus.menas.utils.implicits._
Expand All @@ -50,13 +50,31 @@ class LandingPageController @Autowired() (datasetRepository: DatasetMongoReposit
}

def landingPageInfo(): Future[LandingPageInformation] = {
val dsCountFuture = datasetRepository.distinctCount()
val mappingTableFuture = mappingTableRepository.distinctCount()
val schemaFuture = schemaRepository.distinctCount()
val runFuture = runsService.getCount()
val propertiesWithMissingCountsFuture = statisticsService.getPropertiesWithMissingCount()
val propertiesTotalsFuture: Future[(Int, Int, Int)] = propertiesWithMissingCountsFuture.map(props => {
props.foldLeft(0, 0, 0) { (acum, item) =>
val (count, mandatoryCount, recommendedCount) = acum
item.essentiality match {
case Mandatory(_) => (count + 1, mandatoryCount + item.missingInDatasetsCount, recommendedCount)
case Recommended() => (count + 1, mandatoryCount, recommendedCount + item.missingInDatasetsCount)
case _ => (count + 1, mandatoryCount, recommendedCount)
}
}
})
val todaysStatsfuture = runsService.getTodaysRunsStatistics()
for {
dsCount <- datasetRepository.distinctCount()
mtCount <- mappingTableRepository.distinctCount()
schemaCount <- schemaRepository.distinctCount()
runCount <- runsService.getCount()
todaysStats <- runsService.getTodaysRunsStatistics()
} yield LandingPageInformation(dsCount, mtCount, schemaCount, runCount, todaysStats)
dsCount <- dsCountFuture
mtCount <- mappingTableFuture
schemaCount <- schemaFuture
runCount <- runFuture
(propertiesCount, totalMissingMandatoryProperties, totalMissingRecommendedProperties) <- propertiesTotalsFuture
todaysStats <- todaysStatsfuture
} yield LandingPageInformation(dsCount, mtCount, schemaCount, runCount, propertiesCount,
totalMissingMandatoryProperties, totalMissingRecommendedProperties, todaysStats)
}

// scalastyle:off magic.number
Expand All @@ -67,6 +85,6 @@ class LandingPageController @Autowired() (datasetRepository: DatasetMongoReposit
for {
newStats <- landingPageInfo()
res <- landingPageRepository.updateStatistics(newStats)
} yield res
} yield res
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.menas.controllers

import java.util.concurrent.CompletableFuture

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.{GetMapping, RequestMapping, RestController}
import za.co.absa.enceladus.menas.services.StatisticsService
import za.co.absa.enceladus.model.properties.PropertyDefinitionStats

@RestController
@RequestMapping(Array("/api/statistics"))
class StatisticsController @Autowired() (statisticsService: StatisticsService) extends BaseController {

import za.co.absa.enceladus.menas.utils.implicits._

@GetMapping(Array("/properties/missing"))
def getPropertiesWithMissingCount(): CompletableFuture[Seq[PropertyDefinitionStats]] = {
statisticsService.getPropertiesWithMissingCount()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ case class LandingPageInformation(
totalNumberMappingTables: Int,
totalNumberSchemas: Int,
totalNumberRuns: Long,
totalNumberProperties: Int,
totalNumberMissingMandatoryProperties: Int,
totalNumberMissingRecommendedProperties: Int,
todaysRunsStatistics: TodaysRunsStatistics)
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,10 @@ abstract class VersionedMongoRepository[C <: VersionedModel](mongoDb: MongoDatab
collection.aggregate[VersionedSummary](pipeline).toFuture()
}

def getLatestVersions(): Future[Seq[C]] = {
// there may be a way to this using mongo-joining (aggregation.lookup) instead
getLatestVersionsSummary(None).flatMap { summaries =>
val resultIn = summaries.map { summary =>
getVersion(summary._id, summary.latestVersion).map(_.toSeq)
}

Future.sequence(resultIn).map(_.flatten)
}
def getLatestVersions(missingProperty: Option[String]): Future[Seq[C]] = {
val missingFilter = missingProperty.map(missingProp =>
Filters.not(Filters.exists(s"properties.$missingProp")))
collectLatestVersions(missingFilter)
}

def getVersion(name: String, version: Int): Future[Option[C]] = {
Expand Down Expand Up @@ -163,6 +158,18 @@ abstract class VersionedMongoRepository[C <: VersionedModel](mongoDb: MongoDatab
.toFuture()
}

private def collectLatestVersions(postAggFilter: Option[Bson]): Future[Seq[C]] = {
val pipeline = Seq(
filter(Filters.notEqual("disabled", true)),
Aggregates.group("$name",
Accumulators.max("latestVersion", "$version"),
Accumulators.last("doc","$$ROOT")),
Aggregates.replaceRoot("$doc")) ++
postAggFilter.map(Aggregates.filter)

collection.aggregate[C](pipeline).toFuture()
}

private[repositories] def getNotDisabledFilter: Bson = {
notEqual("disabled", true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,21 @@

package za.co.absa.enceladus.menas.services

import scala.concurrent.Future
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import za.co.absa.enceladus.menas.repositories.DatasetMongoRepository
import za.co.absa.enceladus.menas.repositories.OozieRepository
import za.co.absa.enceladus.model.{Dataset, Schema, UsedIn, Validation}
import za.co.absa.enceladus.menas.repositories.{DatasetMongoRepository, OozieRepository}
import za.co.absa.enceladus.menas.services.DatasetService.{RuleValidationsAndFields, _}
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, _}
import za.co.absa.enceladus.model.menas.scheduler.oozie.OozieScheduleInstance

import scala.language.reflectiveCalls
import DatasetService.RuleValidationsAndFields
import za.co.absa.enceladus.utils.validation.ValidationLevel.ValidationLevel
import za.co.absa.enceladus.model.properties.PropertyDefinition
import za.co.absa.enceladus.model.properties.essentiality.Essentiality._
import za.co.absa.enceladus.model.properties.essentiality.Mandatory
import za.co.absa.enceladus.model.{Dataset, Schema, UsedIn, Validation}
import za.co.absa.enceladus.utils.validation.ValidationLevel
import DatasetService._
import za.co.absa.enceladus.utils.validation.ValidationLevel.ValidationLevel

import scala.concurrent.Future
import scala.language.reflectiveCalls
import scala.util.{Failure, Success}


Expand Down Expand Up @@ -217,6 +214,9 @@ class DatasetService @Autowired()(datasetMongoRepository: DatasetMongoRepository
}
}

def getLatestVersions(missingProperty: Option[String]): Future[Seq[Dataset]] =
datasetMongoRepository.getLatestVersions(missingProperty)

override def importItem(item: Dataset, username: String): Future[Option[Dataset]] = {
getLatestVersionValue(item.name).flatMap {
case Some(version) => update(username, item.copy(version = version))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ package za.co.absa.enceladus.menas.services

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import za.co.absa.enceladus.menas.repositories.{DatasetMongoRepository, PropertyDefinitionMongoRepository}
import za.co.absa.enceladus.menas.utils.converters.SparkMenasSchemaConvertor
import za.co.absa.enceladus.menas.repositories.PropertyDefinitionMongoRepository
import za.co.absa.enceladus.model.UsedIn
import za.co.absa.enceladus.model.properties.PropertyDefinition

Expand All @@ -42,6 +41,10 @@ class PropertyDefinitionService @Autowired()(propertyDefMongoRepository: Propert
}
}

def getDistinctCount(): Future[Int] = {
propertyDefMongoRepository.distinctCount()
}

override def create(newPropertyDef: PropertyDefinition, username: String): Future[Option[PropertyDefinition]] = {
val propertyDefBase = PropertyDefinition(
name = newPropertyDef.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.menas.services

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import za.co.absa.enceladus.model.properties.{PropertyDefinition, PropertyDefinitionStats}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

@Component
class StatisticsService @Autowired() (propertyDefService: PropertyDefinitionService, datasetService: DatasetService){
//#TODO find optimizations #1897
def getPropertiesWithMissingCount(): Future[Seq[PropertyDefinitionStats]] = {
val propertyDefsFuture = propertyDefService.getLatestVersions()
propertyDefsFuture
.map { (props: Seq[PropertyDefinition]) =>
val propertiesWithMissingCounts: Seq[Future[PropertyDefinitionStats]] = props.map(propertyDef =>
datasetService
.getLatestVersions(Some(propertyDef.name))
.map(datasetsMissingProp =>
PropertyDefinitionStats(propertyDef, datasetsMissingProp.size))
)
propertiesWithMissingCounts
}
.flatMap { propertiesWithMissingCounts: Seq[Future[PropertyDefinitionStats]] =>
Future.sequence(propertiesWithMissingCounts)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit
}

def getLatestVersions(): Future[Seq[C]] = {
versionedMongoRepository.getLatestVersions()
versionedMongoRepository.getLatestVersions(None)
}

def getSearchSuggestions(): Future[Seq[String]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class PropertyDefinitionApiIntegrationSuite extends BaseRestApiTest with BeforeA
val response = sendGet[Array[PropertyDefinition]](s"$apiUrl") // Array to avoid erasure
assertOk(response)

val responseData = response.getBody.toSeq.map(pd => (pd.name, pd.version))
val responseData = response.getBody.toSeq.map(pd => (pd.name, pd.version)).sortBy(_._1)
val expectedData = Seq("propertyDefinitionA" -> 2, "propertyDefinitionB" -> 3) // disabled pdA-v3 not reported
assert(responseData == expectedData)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.springframework.test.context.junit4.SpringRunner
import za.co.absa.enceladus.menas.exceptions.EntityAlreadyExistsException
import za.co.absa.enceladus.menas.integration.fixtures.{DatasetFixtureService, FixtureService}
import za.co.absa.enceladus.menas.repositories.DatasetMongoRepository
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.test.factories.DatasetFactory
import za.co.absa.enceladus.model.menas.scheduler.oozie.OozieSchedule
Expand Down Expand Up @@ -512,6 +513,27 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
val expected = Seq(dataset3, dataset4).map(DatasetFactory.toSummary)
assert(actual == expected)
}

"search with missing properties" in {
val dataset1ver1 = DatasetFactory.getDummyDataset(name = "dataset1", version = 1)
val dataset1ver2 = DatasetFactory.getDummyDataset(name = "dataset1", version = 2,
properties = Some(Map("prop1"->"a")))
val dataset2ver1 = DatasetFactory.getDummyDataset(name = "dataset2", version = 1)
val dataset2ver2 = DatasetFactory.getDummyDataset(name = "dataset2", version = 2)
val dataset3ver1 = DatasetFactory.getDummyDataset(name = "dataset3", version = 1)
val dataset4ver1 = DatasetFactory.getDummyDataset(name = "dataset4", version = 1,
properties = Some(Map("prop1"->"A")))

val abc1 = DatasetFactory.getDummyDataset(name = "abc", version = 1)

datasetFixture.add(dataset1ver1, dataset1ver2, dataset2ver1, dataset2ver2, dataset3ver1, dataset4ver1, abc1)

val actual: Seq[Dataset] = await(datasetMongoRepository.getLatestVersions(Some("prop1")))
.sortBy(_.name)

val expected = Seq(abc1, dataset2ver2, dataset3ver1)
assert(actual == expected)
}
}

"return all datasets" when {
Expand Down Expand Up @@ -595,7 +617,7 @@ class DatasetRepositoryIntegrationSuite extends BaseRepositoryTest {
assert(await(datasetMongoRepository.findByCoordId("SomeCoordId")) == Seq())
}
}
"return datasets witch matching coordinator ID" when {
"return datasets with matching coordinator ID" when {
"such datasets exist" in {
val schedule = OozieSchedule(scheduleTiming = ScheduleTiming(Seq(), Seq(), Seq(), Seq(), Seq()),
runtimeParams = RuntimeConfig(sysUser = "user", menasKeytabFile = "/a/b/c"), datasetVersion = 0,
Expand Down
Loading

0 comments on commit c96f779

Please sign in to comment.