Skip to content

Commit

Permalink
#1371 Implemented feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jul 27, 2020
1 parent 5054da2 commit daa4617
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,14 @@ trait CommonJobExecution {
}
}

protected def getDefaultPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
PathConfig(
rawPath = buildRawPath(cmd, dataset, reportVersion),
publishPath = buildPublishPath(cmd, dataset, reportVersion),
standardizationPath = getStandardizationPath(cmd, reportVersion)
)
}

protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig

private def buildPublishPath[T](cmd: JobConfigParser[T], ds: Dataset, reportVersion: Int): String = {
val infoDateCol: String = InfoDateColumn
val infoVersionCol: String = InfoVersionColumn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ trait ConformanceExecution extends CommonJobExecution {
}

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val pathOverride = cmd.asInstanceOf[ConformanceConfig].publishPathOverride
val initialConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
pathOverride match {
val initialConfig = super.getPathConfig(cmd, dataset, reportVersion)
cmd.asInstanceOf[ConformanceConfig].publishPathOverride match {
case None => initialConfig
case Some(providedRawPath) => initialConfig.copy(publishPath = providedRawPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ case class CoalesceRuleInterpreter(rule: CoalesceConformanceRule) extends RuleIn
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
(implicit spark: SparkSession,
explosionState: ExplosionState,
dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
// Validate the rule parameters
RuleValidators.validateSameParent(progArgs.datasetName, ruleName, rule.inputColumns :+ rule.outputColumn: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ trait StandardizationExecution extends CommonJobExecution {
}

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val pathOverride = cmd.asInstanceOf[StandardizationConfig].rawPathOverride
val initialConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
pathOverride match {
val initialConfig = super.getPathConfig(cmd, dataset, reportVersion)
cmd.asInstanceOf[StandardizationConfig].rawPathOverride match {
case None => initialConfig
case Some(providedRawPath) => initialConfig.copy(rawPath = providedRawPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@

package za.co.absa.enceladus.standardization_conformance

import za.co.absa.enceladus.common.CommonJobExecution
import za.co.absa.enceladus.common.config.{JobConfigParser, PathConfig}
import za.co.absa.enceladus.conformance.ConformanceExecution
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.StandardizationExecution
import za.co.absa.enceladus.standardization_conformance.config.StandardizationConformanceConfig
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils

trait StandardizationAndConformanceExecution extends StandardizationExecution with ConformanceExecution {
trait StandardizationAndConformanceExecution extends StandardizationExecution
with ConformanceExecution
with CommonJobExecution{

override def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
val defaultConfig = super[CommonJobExecution].getPathConfig(cmd, dataset, reportVersion)
val jobCmd = cmd.asInstanceOf[StandardizationConformanceConfig]
val rawPathOverride = jobCmd.rawPathOverride
val publishPathOverride = jobCmd.publishPathOverride
val defaultConfig = getDefaultPathConfig(cmd, dataset, reportVersion)
defaultConfig.copy(rawPath = rawPathOverride.getOrElse(defaultConfig.rawPath),
publishPath = publishPathOverride.getOrElse(defaultConfig.publishPath))
}
Expand Down

0 comments on commit daa4617

Please sign in to comment.