Skip to content

Commit

Permalink
#1371 Moved jar assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jul 24, 2020
1 parent 96bcacc commit c7cc534
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,9 @@ trait CommonJobExecution {
val reportVersion = getReportVersion(cmd, dataset)
val pathCfg = getPathConfig(cmd, dataset, reportVersion)

val (inputPath, outputPath) = getInputOutputPaths(cmd, fsUtils, pathCfg)
val inputPath = getInputPath(pathCfg)

log.info(s"input path: $inputPath")
log.info(s"output path: $outputPath")

// die if the output path exists
validateForExistingOutputPath(fsUtils, outputPath)
validateOutputPath(fsUtils, pathCfg)

val performance = initPerformanceMeasurer(inputPath)

Expand All @@ -108,19 +104,18 @@ trait CommonJobExecution {
PreparationResult(dataset, reportVersion, pathCfg, performance)
}

private def getInputOutputPaths[T](cmd: JobConfigParser[T], fsUtils: FileSystemVersionUtils, pathCfg: PathConfig): (String, String) = {
cmd match {
case _: StandardizationConfig => (pathCfg.rawPath, pathCfg.standardizationPath)
case _: ConformanceConfig => (pathCfg.standardizationPath, pathCfg.publishPath)
case _ => {
val intermediatePath = pathCfg.standardizationPath
log.info(s"standardization path: $intermediatePath")
validateForExistingOutputPath(fsUtils, intermediatePath)
(pathCfg.rawPath, pathCfg.publishPath)
}
protected def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit

protected def validateIfPathAlreadyExists(fsUtils: FileSystemVersionUtils, path: String): Unit = {
if (fsUtils.hdfsExists(path)) {
throw new IllegalStateException(
s"Path $path already exists. Increment the run version, or delete $path"
)
}
}

protected def getInputPath[T](pathCfg: PathConfig): String

protected def runPostProcessing[T](sourcePhase: SourcePhase, preparationResult: PreparationResult, jobCmdConfig: JobConfigParser[T])
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val outputPath = sourcePhase match {
Expand Down Expand Up @@ -166,44 +161,37 @@ trait CommonJobExecution {
}
}

protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
protected def getDefaultPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
PathConfig(
rawPath = buildRawPath(cmd.asInstanceOf[StandardizationConfigParser[StandardizationConformanceConfig]], dataset, reportVersion),
publishPath = buildPublishPath(cmd.asInstanceOf[ConformanceConfigParser[StandardizationConformanceConfig]], dataset, reportVersion),
rawPath = buildRawPath(cmd, dataset, reportVersion),
publishPath = buildPublishPath(cmd, dataset, reportVersion),
standardizationPath = getStandardizationPath(cmd, reportVersion)
)
}

def buildPublishPath[T](cmd: ConformanceConfigParser[T],
ds: Dataset,
reportVersion: Int): String = {
protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig

private def buildPublishPath[T](cmd: JobConfigParser[T], ds: Dataset, reportVersion: Int): String = {
val infoDateCol: String = InfoDateColumn
val infoVersionCol: String = InfoVersionColumn

(cmd.publishPathOverride, cmd.folderPrefix) match {
case (None, None) =>
s"${ds.hdfsPublishPath}/$infoDateCol=${cmd.reportDate}/$infoVersionCol=$reportVersion"
case (None, Some(folderPrefix)) =>
cmd.folderPrefix match {
case None => s"${ds.hdfsPublishPath}/$infoDateCol=${cmd.reportDate}/$infoVersionCol=$reportVersion"
case Some(folderPrefix) =>
s"${ds.hdfsPublishPath}/$folderPrefix/$infoDateCol=${cmd.reportDate}/$infoVersionCol=$reportVersion"
case (Some(publishPathOverride), _) =>
publishPathOverride
}
}

def buildRawPath[T](cmd: StandardizationConfigParser[T], dataset: Dataset, reportVersion: Int): String = {
private def buildRawPath[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): String = {
val dateTokens = cmd.reportDate.split("-")
cmd.rawPathOverride match {
case None =>
val folderSuffix = s"/${dateTokens(0)}/${dateTokens(1)}/${dateTokens(2)}/v$reportVersion"
cmd.folderPrefix match {
case None => s"${dataset.hdfsPath}$folderSuffix"
case Some(folderPrefix) => s"${dataset.hdfsPath}/$folderPrefix$folderSuffix"
}
case Some(rawPathOverride) => rawPathOverride
val folderSuffix = s"/${dateTokens(0)}/${dateTokens(1)}/${dateTokens(2)}/v$reportVersion"
cmd.folderPrefix match {
case None => s"${dataset.hdfsPath}$folderSuffix"
case Some(folderPrefix) => s"${dataset.hdfsPath}/$folderPrefix$folderSuffix"
}
}

protected def getStandardizationPath[T](jobConfig: JobConfigParser[T], reportVersion: Int): String = {
private def getStandardizationPath[T](jobConfig: JobConfigParser[T], reportVersion: Int): String = {
MessageFormat.format(conf.getString("standardized.hdfs.path"),
jobConfig.datasetName,
jobConfig.datasetVersion.toString,
Expand All @@ -226,14 +214,6 @@ trait CommonJobExecution {
}
}

protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, path: String): Unit = {
if (fsUtils.hdfsExists(path)) {
throw new IllegalStateException(
s"Path $path already exists. Increment the run version, or delete $path"
)
}
}

protected def writePerformanceMetrics[T](performance: PerformanceMeasurer, jobCmdConfig: JobConfigParser[T]): Unit = {
jobCmdConfig.performanceMetricsFile.foreach(fileName => try {
performance.writeMetricsToFile(fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Feature
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasCredentials
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.config.StandardizationConfig
import za.co.absa.enceladus.standardization_conformance.StandardizationAndConformanceJob
import za.co.absa.enceladus.standardization_conformance.config.StandardizationConformanceConfig
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
Expand All @@ -53,6 +54,9 @@ trait ConformanceExecution extends CommonJobExecution {
cmd: ConformanceConfigParser[T],
fsUtils: FileSystemVersionUtils,
spark: SparkSession): Unit = {
log.info(s"standardization path: ${preparationResult.pathCfg.standardizationPath}")
log.info(s"publish path: ${preparationResult.pathCfg.publishPath}")

// Enable Control Framework
import za.co.absa.atum.AtumImplicits.SparkSessionWrapper

Expand All @@ -77,6 +81,21 @@ trait ConformanceExecution extends CommonJobExecution {
preparationResult.reportVersion)
}

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val pathOverride = cmd.asInstanceOf[ConformanceConfig].publishPathOverride
val initialConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
pathOverride match {
case None => initialConfig
case Some(providedRawPath) => initialConfig.copy(publishPath = providedRawPath)
}
}

override def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit = {
validateIfPathAlreadyExists(fsUtils, pathConfig.publishPath)
}

override def getInputPath[T](pathCfg: PathConfig): String = pathCfg.standardizationPath

protected def readConformanceInputData(pathCfg: PathConfig)(implicit spark: SparkSession): DataFrame = {
spark.read.parquet(pathCfg.standardizationPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.modules.SourcePhase

object DynamicConformanceJob extends ConformanceExecution {
private val jobName: String = "Dynamic Conformance"
private val jobName: String = "Enceladus Conformance"

def main(args: Array[String]) {
// This should be the first thing the app does to make secure Kafka work with our CA.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.conformance.interpreter.rules
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, RuleValidators}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{CoalesceConformanceRule, ConformanceRule}
import za.co.absa.spark.hats.Extensions._
Expand All @@ -33,7 +33,8 @@ case class CoalesceRuleInterpreter(rule: CoalesceConformanceRule) extends RuleIn
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
// Validate the rule parameters
RuleValidators.validateSameParent(progArgs.datasetName, ruleName, rule.inputColumns :+ rule.outputColumn: _*)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import za.co.absa.enceladus.common.{CommonJobExecution, Constants}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasCredentials
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.config.StandardizationConfigParser
import za.co.absa.enceladus.standardization.config.{StandardizationConfig, StandardizationConfigParser}
import za.co.absa.enceladus.standardization.interpreter.StandardizationInterpreter
import za.co.absa.enceladus.standardization.interpreter.stages.PlainSchemaGenerator
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
Expand Down Expand Up @@ -57,6 +57,9 @@ trait StandardizationExecution extends CommonJobExecution {
spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.rawPath}/_INFO")
.setControlMeasuresWorkflow(sourceId.toString)

log.info(s"raw path: ${preparationResult.pathCfg.rawPath}")
log.info(s"standardization path: ${preparationResult.pathCfg.standardizationPath}")

// Enable control framework performance optimization for pipeline-like jobs
Atum.setAllowUnpersistOldDatasets(true)

Expand All @@ -83,6 +86,21 @@ trait StandardizationExecution extends CommonJobExecution {
dao.getSchema(preparationResult.dataset.schemaName, preparationResult.dataset.schemaVersion)
}

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val pathOverride = cmd.asInstanceOf[StandardizationConfig].rawPathOverride
val initialConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
pathOverride match {
case None => initialConfig
case Some(providedRawPath) => initialConfig.copy(rawPath = providedRawPath)
}
}

override def getInputPath[T](pathCfg: PathConfig): String = pathCfg.rawPath

override def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit = {
validateIfPathAlreadyExists(fsUtils: FileSystemVersionUtils, pathConfig.standardizationPath)
}

protected def readStandardizationInputData[T](schema: StructType,
cmd: StandardizationConfigParser[T],
path: String,
Expand Down Expand Up @@ -163,25 +181,24 @@ trait StandardizationExecution extends CommonJobExecution {
handleEmptyOutput(sourceId)
}

val outputPath = preparationResult.pathCfg.standardizationPath
standardizedDF.write.parquet(outputPath)
standardizedDF.write.parquet(preparationResult.pathCfg.standardizationPath)
// Store performance metrics
// (record count, directory sizes, elapsed time, etc. to _INFO file metadata and performance file)
val stdDirSize = fsUtils.getDirectorySize(outputPath)
val stdDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.standardizationPath)
preparationResult.performance.finishMeasurement(stdDirSize, recordCount)
PerformanceMetricTools.addPerformanceMetricsToAtumMetadata(
spark,
"std",
preparationResult.pathCfg.rawPath,
outputPath,
preparationResult.pathCfg.standardizationPath,
menasCredentials.username,
args.mkString(" ")
)

cmd.rowTag.foreach(rowTag => Atum.setAdditionalInfo("xml_row_tag" -> rowTag))
cmd.csvDelimiter.foreach(delimiter => Atum.setAdditionalInfo("csv_delimiter" -> delimiter))

standardizedDF.writeInfoFile(outputPath)
standardizedDF.writeInfoFile(preparationResult.pathCfg.standardizationPath)
writePerformanceMetrics(preparationResult.performance, cmd)
log.info(s"$sourceId finished successfully")
standardizedDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.udf.UDFLibrary

object StandardizationJob extends StandardizationExecution {
private val jobName: String = "Standardisation"
private val jobName: String = "Enceladus Standardization"

def main(args: Array[String]) {
initialValidation()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.standardization_conformance

import za.co.absa.enceladus.common.config.{JobConfigParser, PathConfig}
import za.co.absa.enceladus.conformance.ConformanceExecution
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.StandardizationExecution
import za.co.absa.enceladus.standardization_conformance.config.StandardizationConformanceConfig
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils

trait StandardizationAndConformanceExecution extends StandardizationExecution with ConformanceExecution {

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val jobCmd = cmd.asInstanceOf[StandardizationConformanceConfig]
val rawPathOverride = jobCmd.rawPathOverride
val publishPathOverride = jobCmd.publishPathOverride
val defaultConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
defaultConfig.copy(rawPath = rawPathOverride.getOrElse(defaultConfig.rawPath),
publishPath = publishPathOverride.getOrElse(defaultConfig.publishPath))
}

override def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit = {
validateIfPathAlreadyExists(fsUtils, pathConfig.standardizationPath)
validateIfPathAlreadyExists(fsUtils, pathConfig.publishPath)
}

override def getInputPath[T](pathCfg: PathConfig): String = super[StandardizationExecution].getInputPath(pathCfg)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.udf.UDFLibrary

object StandardizationAndConformanceJob extends StandardizationExecution with ConformanceExecution {
private val jobName = "Standardization Conformance"
object StandardizationAndConformanceJob extends StandardizationAndConformanceExecution {
private val jobName = "Enceladus Standardization&Conformance"

def main(args: Array[String]): Unit = {
initialValidation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,20 @@ class ConformanceParserSuite extends FunSuite with SparkTestBase {
"--folder-prefix", folderPrefix,
"--menas-credentials-file", menasCredentialsFile,
"--debug-set-publish-path", hdfsPublishPathOverride))
val publishPathNoFolderPrefix = TestDynamicConformance.buildPublishPath(cmdConfigNoFolderPrefix,
conformanceDataset, cmdConfigNoFolderPrefix.reportVersion.get)
val publishPathNoFolderPrefix = TestDynamicConformance
.getPathConfig(cmdConfigNoFolderPrefix, conformanceDataset, cmdConfigNoFolderPrefix.reportVersion.get).publishPath
assert(publishPathNoFolderPrefix === s"$hdfsPublishPath/$infoDateColumn=$reportDate/$infoVersionColumn=$reportVersion")
val publishPathFolderPrefix = TestDynamicConformance.buildPublishPath(cmdConfigFolderPrefix,
conformanceDataset, cmdConfigFolderPrefix.reportVersion.get)
val publishPathFolderPrefix = TestDynamicConformance
.getPathConfig(cmdConfigFolderPrefix, conformanceDataset, cmdConfigFolderPrefix.reportVersion.get).publishPath
assert(publishPathFolderPrefix === s"$hdfsPublishPath/$folderPrefix/$infoDateColumn=$reportDate/$infoVersionColumn=$reportVersion")
val publishPathPublishPathOverride = TestDynamicConformance.buildPublishPath(cmdConfigPublishPathOverride, conformanceDataset, cmdConfigPublishPathOverride.reportVersion.get)
val publishPathPublishPathOverride = TestDynamicConformance
.getPathConfig(cmdConfigPublishPathOverride, conformanceDataset, cmdConfigPublishPathOverride.reportVersion.get)
.publishPath
assert(publishPathPublishPathOverride === hdfsPublishPathOverride)

val publishPathPublishPathOverrideAndFolderPrefix =
TestDynamicConformance.buildPublishPath(cmdConfigPublishPathOverrideAndFolderPrefix,
conformanceDataset, cmdConfigPublishPathOverrideAndFolderPrefix.reportVersion.get)
val publishPathPublishPathOverrideAndFolderPrefix = TestDynamicConformance
.getPathConfig(cmdConfigPublishPathOverrideAndFolderPrefix,
conformanceDataset, cmdConfigPublishPathOverrideAndFolderPrefix.reportVersion.get).publishPath
assert(publishPathPublishPathOverrideAndFolderPrefix === hdfsPublishPathOverride)
}

Expand Down
Loading

0 comments on commit c7cc534

Please sign in to comment.