Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1371 Combined Standardization Conformance Job #1392

Merged
merged 28 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3519c3a
Merge branch 'feature/1015-extract-common-standardization-conformance…
Adrian-Olosutean Jun 11, 2020
b69c950
#1371 New main job created
Adrian-Olosutean Jun 11, 2020
603329f
#1371 Combined job updated
Adrian-Olosutean Jun 20, 2020
c5f3129
Merge remote-tracking branch 'origin/feature/1015-extract-common-stan…
Adrian-Olosutean Jun 25, 2020
799de19
#1371 Integrate recent refactoring
Adrian-Olosutean Jun 25, 2020
a8088e6
Merge branch 'feature/1015-extract-common-standardization-conformance…
Adrian-Olosutean Jul 14, 2020
73c2923
#1371 Updated combined job with latest refactoring
Adrian-Olosutean Jul 14, 2020
aa68dea
Merge remote-tracking branch 'origin/develop' into feature/1371-combi…
Adrian-Olosutean Jul 14, 2020
455128a
#1371 Merged recent changes
Adrian-Olosutean Jul 14, 2020
879b010
#1371 Reinitializing Control Framework
Adrian-Olosutean Jul 15, 2020
db426be
#1371 Path config documentation + other improvements
Adrian-Olosutean Jul 15, 2020
fd24062
Merge branch 'develop' into feature/1371-combined-std-conf-job
Zejnilovic Jul 18, 2020
b557789
#1371 Other small changes
Adrian-Olosutean Jul 21, 2020
b61fb3e
#1371 Start feedback
Adrian-Olosutean Jul 21, 2020
8c4296a
Merge remote-tracking branch 'origin/develop' into feature/1371-combi…
Adrian-Olosutean Jul 22, 2020
cbe4f03
#1371 Version with raw, publish, stdpath
Adrian-Olosutean Jul 23, 2020
3c7441f
Merge branch 'feature/1371-combined-std-conf-job' of https://github.c…
Adrian-Olosutean Jul 23, 2020
96bcacc
Merge remote-tracking branch 'origin/develop' into feature/1371-combi…
Adrian-Olosutean Jul 24, 2020
c7cc534
#1371 Moved jar assignment
Adrian-Olosutean Jul 24, 2020
5054da2
Merge remote-tracking branch 'origin/develop' into feature/1371-combi…
Adrian-Olosutean Jul 24, 2020
864b7ef
Merge branch 'develop' into feature/1371-combined-std-conf-job
AdrianOlosutean Jul 24, 2020
daa4617
#1371 Implemented feedback
Adrian-Olosutean Jul 27, 2020
dc6b966
Merge remote-tracking branch 'origin/feature/1371-combined-std-conf-j…
Adrian-Olosutean Jul 27, 2020
cc4ef03
#1371 Imports optimization
Adrian-Olosutean Jul 27, 2020
f0d53a3
#1371 Reassigned performance
Adrian-Olosutean Jul 28, 2020
2240788
Merge remote-tracking branch 'origin/develop' into feature/1371-combi…
Adrian-Olosutean Jul 29, 2020
44c5af3
#1371 Conflict resolution
Adrian-Olosutean Jul 29, 2020
f58713d
#1371 Minor code style improvements
Adrian-Olosutean Jul 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
import za.co.absa.enceladus.dao.MenasDAO
Expand All @@ -40,7 +39,7 @@ case class UppercaseCustomRuleInterpreter(rule: UppercaseCustomConformanceRule)
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: InterpreterContextArgs): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
import za.co.absa.enceladus.dao.MenasDAO
Expand All @@ -40,7 +39,7 @@ case class StringFuncInterpreter(rule: ColumnFunctionCustomConformanceRule) exte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: InterpreterContextArgs): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import za.co.absa.enceladus.utils.config.SecureConfig
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.general.ProjectMetadataTools
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.modules.SourcePhase.{Conformance, Standardization}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conformance not used

import za.co.absa.enceladus.utils.performance.PerformanceMeasurer
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

Expand Down Expand Up @@ -87,8 +88,14 @@ trait CommonJobExecution {

log.info(s"input path: ${pathCfg.inputPath}")
log.info(s"output path: ${pathCfg.outputPath}")

// die if the output path exists
validateForExistingOutputPath(fsUtils, pathCfg)
validateForExistingOutputPath(fsUtils, pathCfg.outputPath)

pathCfg.standardizationPath.foreach(standardizationPath => {
log.info(s"standardization path: $standardizationPath")
validateForExistingOutputPath(fsUtils, standardizationPath)
})

val performance = initPerformanceMeasurer(pathCfg.inputPath)

Expand All @@ -104,7 +111,8 @@ trait CommonJobExecution {

protected def runPostProcessing[T](sourceId: SourcePhase, preparationResult: PreparationResult, jobCmdConfig: JobConfigParser[T])
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val df = spark.read.parquet(preparationResult.pathCfg.outputPath)
val outputPath = preparationResult.pathCfg.standardizationPath.getOrElse(preparationResult.pathCfg.outputPath)
val df = spark.read.parquet(outputPath)
val runId = MenasPlugin.runNumber

if (runId.isEmpty) {
Expand All @@ -122,7 +130,7 @@ trait CommonJobExecution {
val uniqueRunId = Atum.getControlMeasure.runUniqueId

val params = ErrorSenderPluginParams(jobCmdConfig.datasetName,
jobCmdConfig.datasetVersion, jobCmdConfig.reportDate, preparationResult.reportVersion, preparationResult.pathCfg.outputPath,
jobCmdConfig.datasetVersion, jobCmdConfig.reportDate, preparationResult.reportVersion, outputPath,
sourceId, sourceSystem, runUrl, runId, uniqueRunId, Instant.now)
val postProcessingService = PostProcessingService(conf, params)
postProcessingService.onSaveOutput(df)
Expand Down Expand Up @@ -168,10 +176,10 @@ trait CommonJobExecution {
}
}

protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, pathCfg: PathConfig): Unit = {
if (fsUtils.hdfsExists(pathCfg.outputPath)) {
protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, path: String): Unit = {
if (fsUtils.hdfsExists(path)) {
throw new IllegalStateException(
s"Path ${pathCfg.outputPath} already exists. Increment the run version, or delete ${pathCfg.outputPath}"
s"Path $path already exists. Increment the run version, or delete $path"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

package za.co.absa.enceladus.common.config

case class PathConfig(inputPath: String, outputPath: String)
case class PathConfig(inputPath: String, outputPath: String, standardizationPath: Option[String] = None)
Zejnilovic marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ trait ConformanceExecution extends CommonJobExecution {
// Enable Control Framework
import za.co.absa.atum.AtumImplicits.SparkSessionWrapper

spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.inputPath}/_INFO")
// reinitialize Control Framework in case of combined job
val standardizationPath = preparationResult.pathCfg.standardizationPath
standardizationPath.foreach(_ => spark.disableControlMeasuresTracking())

val inputPath = standardizationPath.getOrElse(preparationResult.pathCfg.inputPath)
spark.enableControlMeasuresTracking(s"$inputPath/_INFO")
.setControlMeasuresWorkflow(sourceId.toString)

// Enable control framework performance optimization for pipeline-like jobs
Expand All @@ -74,8 +79,8 @@ trait ConformanceExecution extends CommonJobExecution {
spark.read.parquet(pathCfg.inputPath)
}

protected def conform(inputData: DataFrame, preparationResult: PreparationResult)
(implicit spark: SparkSession, cmd: ConformanceConfig, dao: MenasDAO): DataFrame = {
protected def conform[T](inputData: DataFrame, preparationResult: PreparationResult)
(implicit spark: SparkSession, cmd: ConformanceParser[T], dao: MenasDAO): DataFrame = {
val recordIdGenerationStrategy = getRecordIdGenerationStrategyFromConfig(conf)

implicit val featureSwitcher: FeatureSwitches = conformanceReader.readFeatureSwitches()
Expand All @@ -101,18 +106,19 @@ trait ConformanceExecution extends CommonJobExecution {
}
}

protected def processConformanceResult(args: Array[String],
protected def processConformanceResult[T](args: Array[String],
result: DataFrame,
preparationResult: PreparationResult,
menasCredentials: MenasCredentials)
(implicit spark: SparkSession,
cmd: ConformanceConfig,
cmd: ConformanceParser[T],
fsUtils: FileSystemVersionUtils): Unit = {
val cmdLineArgs: String = args.mkString(" ")

val standardizationPath = preparationResult.pathCfg.standardizationPath.getOrElse(preparationResult.pathCfg.inputPath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any reader of the code not fully the structure of the code, this would be rather confusing... (similar as in StandardizationExecution)

PerformanceMetricTools.addJobInfoToAtumMetadata(
"conform",
preparationResult.pathCfg.inputPath,
standardizationPath,
preparationResult.pathCfg.outputPath,
menasCredentials.username, cmdLineArgs
)
Expand Down Expand Up @@ -140,7 +146,7 @@ trait ConformanceExecution extends CommonJobExecution {
PerformanceMetricTools.addPerformanceMetricsToAtumMetadata(
spark,
"conform",
preparationResult.pathCfg.inputPath,
standardizationPath,
preparationResult.pathCfg.outputPath,
menasCredentials.username, cmdLineArgs
)
Expand All @@ -149,7 +155,7 @@ trait ConformanceExecution extends CommonJobExecution {
writePerformanceMetrics(preparationResult.performance, cmd)

if (conformanceReader.isAutocleanStdFolderEnabled()) {
fsUtils.deleteDirectoryRecursively(preparationResult.pathCfg.inputPath)
fsUtils.deleteDirectoryRecursively(standardizationPath)
}
log.info(s"$sourceId finished successfully")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.conformance

import com.typesafe.config.{Config, ConfigFactory}
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.config.ConformanceParser
import za.co.absa.enceladus.utils.config.ConfigUtils.ConfigImplicits
import za.co.absa.enceladus.conformance.interpreter.{FeatureSwitches, ThreeStateSwitch}
import ConformancePropertiesProvider._
Expand All @@ -30,26 +30,26 @@ class ConformancePropertiesProvider {
private val log: Logger = LoggerFactory.getLogger(this.getClass)
private implicit val conf: Config = ConfigFactory.load()

def isAutocleanStdFolderEnabled()(implicit cmd: ConformanceConfig): Boolean = {
def isAutocleanStdFolderEnabled[T]()(implicit cmd: ConformanceParser[T]): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.autocleanStandardizedFolder, standardizedHdfsFolderKey, defaultValue = false)
log.info(s"Autoclean standardized HDFS folder = $enabled")
enabled
}

def readFeatureSwitches()(implicit cmdConfig: ConformanceConfig): FeatureSwitches = FeatureSwitches()
def readFeatureSwitches[T]()(implicit cmdConfig: ConformanceParser[T]): FeatureSwitches = FeatureSwitches()
.setExperimentalMappingRuleEnabled(isExperimentalRuleEnabled())
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled())
.setControlFrameworkEnabled(enableCF)
.setBroadcastStrategyMode(broadcastingStrategyMode)
.setBroadcastMaxSizeMb(broadcastingMaxSizeMb)

private def isExperimentalRuleEnabled()(implicit cmd: ConformanceConfig): Boolean = {
private def isExperimentalRuleEnabled[T]()(implicit cmd: ConformanceParser[T]): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.experimentalMappingRule, experimentalRuleKey, defaultValue = false)
log.info(s"Experimental mapping rule enabled = $enabled")
enabled
}

private def isCatalystWorkaroundEnabled()(implicit cmd: ConformanceConfig): Boolean = {
private def isCatalystWorkaroundEnabled[T]()(implicit cmd: ConformanceParser[T]): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.isCatalystWorkaroundEnabled, catalystWorkaroundKey, defaultValue = true)
log.info(s"Catalyst workaround enabled = $enabled")
enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.LoggerFactory
import za.co.absa.atum.AtumImplicits._
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.config.ConformanceParser
import za.co.absa.enceladus.conformance.datasource.PartitioningUtils
import za.co.absa.enceladus.conformance.interpreter.rules._
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -50,11 +50,11 @@ object DynamicInterpreter {
* @return The conformed DataFrame.
*
*/
def interpret(conformance: ConfDataset, inputDf: Dataset[Row], jobShortName: String = "Conformance")
(implicit spark: SparkSession, dao: MenasDAO, progArgs: ConformanceConfig, featureSwitches: FeatureSwitches): DataFrame = {
def interpret[T](conformance: ConfDataset, inputDf: Dataset[Row], jobShortName: String = "Conformance")
benedeki marked this conversation as resolved.
Show resolved Hide resolved
(implicit spark: SparkSession, dao: MenasDAO, progArgs: ConformanceParser[T], featureSwitches: FeatureSwitches): DataFrame = {

implicit val interpreterContext: InterpreterContext = InterpreterContext(inputDf.schema, conformance,
featureSwitches, jobShortName, spark, dao, progArgs)
featureSwitches, jobShortName, spark, dao, InterpreterContextArgs.fromConformanceConfig(progArgs))

applyCheckpoint(inputDf, "Start")

Expand All @@ -76,7 +76,7 @@ object DynamicInterpreter {
(implicit ictx: InterpreterContext): DataFrame = {
implicit val spark: SparkSession = ictx.spark
implicit val dao: MenasDAO = ictx.dao
implicit val progArgs: ConformanceConfig = ictx.progArgs
implicit val progArgs: InterpreterContextArgs = ictx.progArgs
implicit val udfLib: UDFLibrary = new UDFLibrary
implicit val explosionState: ExplosionState = new ExplosionState()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,48 @@ package za.co.absa.enceladus.conformance.interpreter

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import org.apache.spark.storage.StorageLevel
import za.co.absa.enceladus.conformance.config.{ConformanceConfig, ConformanceParser}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.standardization_conformance.config.StdConformanceConfig

/** Holds everything that is needed in between dynamic conformance interpreter stages */
case class InterpreterContext (
schema: StructType,
conformance: ConfDataset,
featureSwitches: FeatureSwitches,
jobShortName: String,
spark: SparkSession,
dao: MenasDAO,
progArgs: ConformanceConfig
)

case class InterpreterContextArgs(datasetName: String,
reportDate: String = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missaligned

persistStorageLevel: Option[StorageLevel] = None
)

object InterpreterContextArgs {
def fromConformanceConfig[T](conformanceConfig: ConformanceParser[T]): InterpreterContextArgs = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'ConformanceParser` is really a misleading class name. It's not parsing Conformance, it's carrying (maybe parsing) Conformance configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then ConformanceConfigParser?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that. If considered too long, I don't have a problem with ConfConfigParser.
Btw, same with StandardizationParser


conformanceConfig match {
case ConformanceConfigInstanceInterpreter(interpreterContextArgs) => interpreterContextArgs
case StdConformanceConfigInstanceInterpreter(interpreterContextArgs) => interpreterContextArgs
case _ => throw new Exception("")
}
}
}

object ConformanceConfigInstanceInterpreter {
def unapply(conformanceInstance: ConformanceConfig): Option[InterpreterContextArgs] =
Some(InterpreterContextArgs(conformanceInstance.datasetName: String, conformanceInstance.reportDate: String,
conformanceInstance.persistStorageLevel: Option[StorageLevel]))
}

object StdConformanceConfigInstanceInterpreter {
def unapply(conformanceInstance: StdConformanceConfig): Option[InterpreterContextArgs] =
Some(InterpreterContextArgs(conformanceInstance.datasetName: String, conformanceInstance.reportDate: String,
conformanceInstance.persistStorageLevel: Option[StorageLevel]))
}

case class InterpreterContext(
schema: StructType,
conformance: ConfDataset,
featureSwitches: FeatureSwitches,
jobShortName: String,
spark: SparkSession,
dao: MenasDAO,
progArgs: InterpreterContextArgs
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package za.co.absa.enceladus.conformance.interpreter.rules

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.error.ErrorMessage
Expand All @@ -31,7 +30,8 @@ class ArrayCollapseInterpreter extends RuleInterpreter {
override def conformanceRule: Option[ConformanceRule] = None

override def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
val dfOut = ExplodeTools.revertAllExplosions(df, explosionState.explodeContext, Some(ErrorMessage.errorColumnName))
explosionState.explodeContext = ExplosionContext()
dfOut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package za.co.absa.enceladus.conformance.interpreter.rules

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.explode.ExplodeTools
Expand All @@ -29,7 +28,8 @@ class ArrayExplodeInterpreter(columnName: String) extends RuleInterpreter {
override def conformanceRule: Option[ConformanceRule] = None

override def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
val (dfOut, ctx) = ExplodeTools.explodeAllArraysInPath(columnName, df, explosionState.explodeContext)
explosionState.explodeContext = ctx
dfOut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.spark.hats.Extensions._
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, RuleValidators}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{CastingConformanceRule, ConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
Expand All @@ -35,7 +34,8 @@ case class CastingRuleInterpreter(rule: CastingConformanceRule) extends RuleInte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
// Validate the rule parameters
RuleValidators.validateInputField(progArgs.datasetName, ruleName, df.schema, rule.inputColumn)
RuleValidators.validateOutputField(progArgs.datasetName, ruleName, df.schema, rule.outputColumn)
Expand Down
Loading