Skip to content

Commit

Permalink
#1371 New main job created
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jun 11, 2020
1 parent 3519c3a commit b69c950
Show file tree
Hide file tree
Showing 43 changed files with 302 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class UppercaseCustomRuleInterpreter(rule: UppercaseCustomConformanceRule)
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class StringFuncInterpreter(rule: ColumnFunctionCustomConformanceRule) exte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,21 @@ trait CommonJobExecution {

protected def handleControlInfoValidation(): Unit = {
ControlInfoValidation.addRawAndSourceRecordCountsToMetadata() match {
case Failure(ex: za.co.absa.enceladus.utils.validation.ValidationException) => {
case Failure(ex: za.co.absa.enceladus.utils.validation.ValidationException) =>
val confEntry = "control.info.validation"
conf.getString(confEntry) match {
case "strict" => throw ex
case "warning" => log.warn(ex.msg)
case "none" =>
case _ => throw new RuntimeException(s"Invalid $confEntry value")
}
}
case Failure(ex) => throw ex
case Success(_) =>
}
}

protected def initFunctionalExtensions(reportVersion: Int,
pathCfg: PathCfg,
pathCfg: PathConfig,
isJobStageOnly: Boolean = false,
generateNewRun: Boolean = false)
(implicit spark: SparkSession, dao: MenasDAO,
Expand Down Expand Up @@ -132,7 +131,7 @@ trait CommonJobExecution {
generateNewRun)
}

protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, pathCfg: PathCfg): Unit = {
protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, pathCfg: PathConfig): Unit = {
if (fsUtils.hdfsExists(pathCfg.outputPath)) {
throw new IllegalStateException(
s"Path ${pathCfg.outputPath} already exists. Increment the run version, or delete ${pathCfg.outputPath}"
Expand Down Expand Up @@ -166,7 +165,7 @@ trait CommonJobExecution {
}
}

def runPostProcessors(errorSourceId: ErrorSourceId.Value, pathCfg: PathCfg, jobCmdConfig: JobCmdConfig, reportVersion: Int)
def runPostProcessors(errorSourceId: ErrorSourceId.Value, pathCfg: PathConfig, jobCmdConfig: JobCmdConfig, reportVersion: Int)
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val df = spark.read.parquet(pathCfg.outputPath)
val runId = MenasPlugin.runNumber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ object JobCmdConfig {
}
}

final case class PathCfg(inputPath: String, outputPath: String)
case class PathConfig(inputPath: String, outputPath: String, standardizationPath: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ import za.co.absa.enceladus.common.JobCmdConfig
* Note: scopt requires all fields to have default values.
* Even if a field is mandatory it needs a default value.
*/
trait ConfCmdConfigT {
def confConfig: ConfConfig
def jobConfig: JobCmdConfig
}

case class ConfCmdConfig(confConfig: ConfConfig = ConfConfig(),
jobConfig: JobCmdConfig = JobCmdConfig())
jobConfig: JobCmdConfig = JobCmdConfig()) extends ConfCmdConfigT

object ConfCmdConfig {
object ConfCmdConfigT {
val stepName = "Conformance"

def getCmdLineArguments(args: Array[String]): ConfCmdConfig = {
def getCmdLineArguments(args: Array[String]): ConfCmdConfigT = {
val jobConfig = JobCmdConfig.getCmdLineArguments(args, stepName)
val confConfig = ConfConfig.getCmdLineArguments(args)
ConfCmdConfig(confConfig, jobConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.enceladus.conformance

import scopt.OptionParser
import za.co.absa.enceladus.conformance.ConfCmdConfig.stepName
import za.co.absa.enceladus.conformance.ConfCmdConfigT.stepName

case class ConfConfig(publishPathOverride: Option[String] = None,
experimentalMappingRule: Option[Boolean] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import za.co.absa.atum.AtumImplicits
import za.co.absa.atum.AtumImplicits._
import za.co.absa.enceladus.common.Constants.{InfoDateColumn, InfoDateColumnString, InfoVersionColumn, ReportDateFormat}
import za.co.absa.enceladus.common.RecordIdGeneration.{IdType, getRecordIdGenerationStrategyFromConfig}
import za.co.absa.enceladus.common.{CommonJobExecution, Constants, PathCfg, RecordIdGeneration}
import za.co.absa.enceladus.conformance.DynamicConformanceJob.conf
import za.co.absa.enceladus.common.RecordIdGeneration.getRecordIdGenerationStrategyFromConfig
import za.co.absa.enceladus.common.{CommonJobExecution, Constants, PathConfig, RecordIdGeneration}
import za.co.absa.enceladus.conformance.interpreter.rules.ValidationException
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
Expand All @@ -40,18 +39,16 @@ import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

trait ConformanceExecution extends CommonJobExecution {
protected implicit val step = "Conformance"
protected implicit val conformanceStepName: String = "Conformance"
private val conformanceReader = new ConformanceReader(log, conf)

def getPathCfg(cmd: ConfCmdConfig, conformance: Dataset, reportVersion: Int): PathCfg =
PathCfg(
def getPathCfg(cmd: ConfCmdConfigT, conformance: Dataset, reportVersion: Int): PathConfig =
PathConfig(
outputPath = buildPublishPath(cmd, conformance, reportVersion),
inputPath = getStandardizationPath(cmd.jobConfig, reportVersion)
)

def buildPublishPath(cmd: ConfCmdConfig,
ds: Dataset,
reportVersion: Int): String = {
def buildPublishPath(cmd: ConfCmdConfigT, ds: Dataset, reportVersion: Int): String = {
val infoDateCol: String = InfoDateColumn
val infoVersionCol: String = InfoVersionColumn

Expand All @@ -66,22 +63,21 @@ trait ConformanceExecution extends CommonJobExecution {
}

protected def conform(conformance: Dataset, inputData: sql.Dataset[Row])
(implicit spark: SparkSession, cmd: ConfCmdConfig, dao: MenasDAO): DataFrame = {
(implicit spark: SparkSession, cmd: ConfCmdConfigT, dao: MenasDAO): DataFrame = {
val recordIdGenerationStrategy = getRecordIdGenerationStrategyFromConfig(conf)

implicit val featureSwitcher: FeatureSwitches = conformanceReader.readFeatureSwitches()

Try {
handleControlInfoValidation()
DynamicInterpreter.interpret(conformance, inputData)
} match {
case Failure(e: ValidationException) =>
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(step, e.getMessage, e.techDetails)
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(conformanceStepName, e.getMessage, e.techDetails)
throw e
case Failure(NonFatal(e)) =>
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(step, e.getMessage, sw.toString)
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(conformanceStepName, e.getMessage, sw.toString)
throw e
case Success(conformedDF) =>
if (SchemaUtils.fieldExists(Constants.EnceladusRecordId, conformedDF.schema)) {
Expand All @@ -94,11 +90,11 @@ trait ConformanceExecution extends CommonJobExecution {

protected def processConformanceResult(result: DataFrame,
performance: PerformanceMeasurer,
pathCfg: PathCfg,
pathCfg: PathConfig,
reportVersion: Int,
menasCredentials: MenasCredentials)
(implicit spark: SparkSession,
cmd: ConfCmdConfig,
cmd: ConfCmdConfigT,
fsUtils: FileSystemVersionUtils): Unit = {
val cmdLineArgs: String = cmd.jobConfig.args.mkString(" ")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@ class ConformanceReader(log: Logger, conf: Config) {
private val enableCF: Boolean = true
private implicit val config: Config = conf

def isAutocleanStdFolderEnabled()(implicit cmd: ConfCmdConfig): Boolean = {
def isAutocleanStdFolderEnabled()(implicit cmd: ConfCmdConfigT): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.confConfig.autocleanStandardizedFolder,
"conformance.autoclean.standardized.hdfs.folder",
defaultValue = false)
log.info(s"Autoclean standardized HDFS folder = $enabled")
enabled
}

def readFeatureSwitches()(implicit cmdConfig: ConfCmdConfig): FeatureSwitches = FeatureSwitches()
def readFeatureSwitches()(implicit cmdConfig: ConfCmdConfigT): FeatureSwitches = FeatureSwitches()
.setExperimentalMappingRuleEnabled(isExperimentalRuleEnabled())
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled())
.setControlFrameworkEnabled(enableCF)
.setBroadcastStrategyMode(broadcastingStrategyMode)
.setBroadcastMaxSizeMb(broadcastingMaxSizeMb)

private def isExperimentalRuleEnabled()(implicit cmd: ConfCmdConfig): Boolean = {
private def isExperimentalRuleEnabled()(implicit cmd: ConfCmdConfigT): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.confConfig.experimentalMappingRule,
"conformance.mapping.rule.experimental.implementation",
defaultValue = false)
log.info(s"Experimental mapping rule enabled = $enabled")
enabled
}

private def isCatalystWorkaroundEnabled()(implicit cmd: ConfCmdConfig): Boolean = {
private def isCatalystWorkaroundEnabled()(implicit cmd: ConfCmdConfigT): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.confConfig.isCatalystWorkaroundEnabled,
"conformance.catalyst.workaround",
defaultValue = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.conformance

import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.SparkSession
import za.co.absa.enceladus.common.JobCmdConfig
import za.co.absa.enceladus.common.{JobCmdConfig, PathConfig}
import za.co.absa.enceladus.common.version.SparkVersionGuard
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.rest.RestDaoFactory
Expand All @@ -34,7 +34,7 @@ object DynamicConformanceJob extends ConformanceExecution {

SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

implicit val cmd: ConfCmdConfig = ConfCmdConfig.getCmdLineArguments(args)
implicit val cmd: ConfCmdConfigT = ConfCmdConfigT.getCmdLineArguments(args)
implicit val jobCmdConfig: JobCmdConfig = cmd.jobConfig
implicit val spark: SparkSession = obtainSparkSession() // initialize spark
implicit val fsUtils: FileSystemVersionUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)
Expand All @@ -45,7 +45,8 @@ object DynamicConformanceJob extends ConformanceExecution {
// get the dataset definition
val dataset = dao.getDataset(jobCmdConfig.datasetName, jobCmdConfig.datasetVersion)
val reportVersion = getReportVersion(cmd.jobConfig, dataset)
val pathCfg = getPathCfg(cmd, dataset, reportVersion)
val pathCfg: PathConfig = getPathCfg(cmd, dataset, reportVersion)


log.info(s"stdpath = ${pathCfg.inputPath}")
log.info(s"publishPath = ${pathCfg.outputPath}")
Expand All @@ -62,7 +63,7 @@ object DynamicConformanceJob extends ConformanceExecution {
val result = conform(dataset, inputData)

processConformanceResult(result, performance, pathCfg, reportVersion, menasCredentials)
log.info(s"$step finished successfully")
log.info(s"$conformanceStepName finished successfully")

runPostProcessors(ErrorSourceId.Conformance, pathCfg, jobCmdConfig, reportVersion)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory
import za.co.absa.atum.AtumImplicits._
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.datasource.PartitioningUtils
import za.co.absa.enceladus.conformance.interpreter.rules._
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand Down Expand Up @@ -51,7 +51,7 @@ object DynamicInterpreter {
*
*/
def interpret(conformance: ConfDataset, inputDf: Dataset[Row], jobShortName: String = "Conformance")
(implicit spark: SparkSession, dao: MenasDAO, progArgs: ConfCmdConfig, featureSwitches: FeatureSwitches): DataFrame = {
(implicit spark: SparkSession, dao: MenasDAO, progArgs: ConfCmdConfigT, featureSwitches: FeatureSwitches): DataFrame = {

implicit val interpreterContext: InterpreterContext = InterpreterContext(inputDf.schema, conformance,
featureSwitches, jobShortName, spark, dao, progArgs)
Expand All @@ -76,7 +76,7 @@ object DynamicInterpreter {
(implicit ictx: InterpreterContext): DataFrame = {
implicit val spark: SparkSession = ictx.spark
implicit val dao: MenasDAO = ictx.dao
implicit val progArgs: ConfCmdConfig = ictx.progArgs
implicit val progArgs: ConfCmdConfigT = ictx.progArgs
implicit val udfLib: UDFLibrary = new UDFLibrary
implicit val explosionState: ExplosionState = new ExplosionState()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.conformance.interpreter

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConfCmdConfigT
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.{Dataset => ConfDataset}

Expand All @@ -29,5 +29,5 @@ case class InterpreterContext (
jobShortName: String,
spark: SparkSession,
dao: MenasDAO,
progArgs: ConfCmdConfig
progArgs: ConfCmdConfigT
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.enceladus.conformance.interpreter.rules

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
Expand All @@ -31,7 +31,7 @@ class ArrayCollapseInterpreter extends RuleInterpreter {
override def conformanceRule: Option[ConformanceRule] = None

override def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
val dfOut = ExplodeTools.revertAllExplosions(df, explosionState.explodeContext, Some(ErrorMessage.errorColumnName))
explosionState.explodeContext = ExplosionContext()
dfOut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.enceladus.conformance.interpreter.rules

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
Expand All @@ -29,7 +29,7 @@ class ArrayExplodeInterpreter(columnName: String) extends RuleInterpreter {
override def conformanceRule: Option[ConformanceRule] = None

override def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
val (dfOut, ctx) = ExplodeTools.explodeAllArraysInPath(columnName, df, explosionState.explodeContext)
explosionState.explodeContext = ctx
dfOut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.spark.hats.Extensions._
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{CastingConformanceRule, ConformanceRule}
Expand All @@ -35,7 +35,7 @@ case class CastingRuleInterpreter(rule: CastingConformanceRule) extends RuleInte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
// Validate the rule parameters
RuleValidators.validateInputField(progArgs.jobConfig.datasetName, ruleName, df.schema, rule.inputColumn)
RuleValidators.validateOutputField(progArgs.jobConfig.datasetName, ruleName, df.schema, rule.outputColumn)
Expand Down
Loading

0 comments on commit b69c950

Please sign in to comment.