Skip to content

Commit

Permalink
#1371 Reassigned performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jul 28, 2020
1 parent cc4ef03 commit f0d53a3
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,16 @@ trait CommonJobExecution {
val reportVersion = getReportVersion(cmd, dataset)
val pathCfg = getPathConfig(cmd, dataset, reportVersion)

val inputPath = getInputPath(pathCfg)

validateOutputPath(fsUtils, pathCfg)

val performance = initPerformanceMeasurer(inputPath)

// Enable Spline
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()

// Enable non-default persistence storage level if provided in the command line
cmd.persistStorageLevel.foreach(Atum.setCachingStorageLevel)

PreparationResult(dataset, reportVersion, pathCfg, performance)
PreparationResult(dataset, reportVersion, pathCfg, new PerformanceMeasurer(spark.sparkContext.appName))
}

protected def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit
Expand All @@ -111,8 +107,6 @@ trait CommonJobExecution {
}
}

protected def getInputPath[T](pathCfg: PathConfig): String

protected def runPostProcessing[T](sourcePhase: SourcePhase, preparationResult: PreparationResult, jobCmdConfig: JobConfigParser[T])
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val outputPath = sourcePhase match {
Expand Down Expand Up @@ -247,12 +241,4 @@ trait CommonJobExecution {
newVersion
}
}

private def initPerformanceMeasurer(path: String)
(implicit spark: SparkSession, fsUtils: FileSystemVersionUtils): PerformanceMeasurer = {
val performance = new PerformanceMeasurer(spark.sparkContext.appName)
val stdDirSize = fsUtils.getDirectorySize(path)
performance.startMeasurement(stdDirSize)
performance
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ trait ConformanceExecution extends CommonJobExecution {
cmd: ConformanceConfigParser[T],
fsUtils: FileSystemVersionUtils,
spark: SparkSession): Unit = {
val stdDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.standardizationPath)
preparationResult.performance.startMeasurement(stdDirSize)

log.info(s"standardization path: ${preparationResult.pathCfg.standardizationPath}")
log.info(s"publish path: ${preparationResult.pathCfg.publishPath}")

Expand Down Expand Up @@ -91,8 +94,6 @@ trait ConformanceExecution extends CommonJobExecution {
validateIfPathAlreadyExists(fsUtils, pathConfig.publishPath)
}

override def getInputPath[T](pathCfg: PathConfig): String = pathCfg.standardizationPath

protected def readConformanceInputData(pathCfg: PathConfig)(implicit spark: SparkSession): DataFrame = {
spark.read.parquet(pathCfg.standardizationPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import za.co.absa.enceladus.standardization.interpreter.StandardizationInterpret
import za.co.absa.enceladus.standardization.interpreter.stages.PlainSchemaGenerator
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.performance.PerformanceMetricTools
import za.co.absa.enceladus.utils.performance.{PerformanceMeasurer, PerformanceMetricTools}
import za.co.absa.enceladus.utils.schema.{MetadataKeys, SchemaUtils, SparkUtils}
import za.co.absa.enceladus.utils.udf.UDFLibrary
import za.co.absa.enceladus.utils.validation.ValidationException
Expand All @@ -52,6 +52,8 @@ trait StandardizationExecution extends CommonJobExecution {
fsUtils: FileSystemVersionUtils,
spark: SparkSession): StructType = {

val stdDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.rawPath)
preparationResult.performance.startMeasurement(stdDirSize)
// Enable Control Framework
import za.co.absa.atum.AtumImplicits.SparkSessionWrapper
spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.rawPath}/_INFO")
Expand Down Expand Up @@ -94,8 +96,6 @@ trait StandardizationExecution extends CommonJobExecution {
}
}

override def getInputPath[T](pathCfg: PathConfig): String = pathCfg.rawPath

override def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit = {
validateIfPathAlreadyExists(fsUtils: FileSystemVersionUtils, pathConfig.standardizationPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,4 @@ trait StandardizationAndConformanceExecution extends StandardizationExecution
validateIfPathAlreadyExists(fsUtils, pathConfig.standardizationPath)
validateIfPathAlreadyExists(fsUtils, pathConfig.publishPath)
}

override def getInputPath[T](pathCfg: PathConfig): String = super[StandardizationExecution].getInputPath(pathCfg)
}

0 comments on commit f0d53a3

Please sign in to comment.