Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2022 spark-commons #2023

Merged
merged 6 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.dao.rest.auth
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.auth.MenasPlainCredentials
import za.co.absa.enceladus.utils.fs.LocalFsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase
import za.co.absa.spark.commons.test.SparkTestBase

class MenasPlainCredentialsSuite extends AnyWordSpec with SparkTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Feature
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, SparkTestBase}
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.spark.commons.test.SparkTestBase


case class TestInputRow(id: Int, mandatoryString: String, nullableString: Option[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, SparkTestBase}
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.spark.commons.test.SparkTestBase

case class XPadTestInputRow(intField: Int, stringField: Option[String])
case class XPadTestOutputRow(intField: Int, stringField: Option[String], targetField: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor
import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams}
import za.co.absa.enceladus.plugins.builtin.errorsender.DceError
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginImpl.SingleErrorStardardized
import za.co.absa.enceladus.utils.schema.SchemaUtils
import KafkaErrorSenderPluginImpl._
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes
import za.co.absa.enceladus.utils.modules._
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

import scala.util.{Failure, Success, Try}

Expand All @@ -48,7 +48,7 @@ case class KafkaErrorSenderPluginImpl(connectionParams: KafkaConnectionParams,
* @param paramsMap Additional key/value parameters provided by Enceladus.
*/
override def onDataReady(dataFrame: DataFrame, paramsMap: Map[String, String]): Unit = {
if (!SchemaUtils.fieldExists(ColumnNames.enceladusRecordId, dataFrame.schema)) {
if (!dataFrame.schema.fieldExists(ColumnNames.enceladusRecordId)) {
throw new IllegalStateException(
s"${this.getClass.getName} requires ${ColumnNames.enceladusRecordId} column to be present in the dataframe!"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPlugi
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.testUtils.SparkTestBase
import za.co.absa.spark.commons.test.SparkTestBase


class KafkaErrorSenderPluginSuite extends AnyFlatSpec with SparkTestBase with Matchers with BeforeAndAfterAll {
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@
<scoverage.maven.plugin.version>1.3.0</scoverage.maven.plugin.version>
<!--dependency versions-->
<abris.version>3.1.1</abris.version>
<absa.commons.version>0.0.27</absa.commons.version>
<absa.commons.version>1.0.0</absa.commons.version>
<absa.spark.commons.version>0.2.0</absa.spark.commons.version>
<atum.version>3.7.0</atum.version>
<bower.chart.js.version>2.7.3</bower.chart.js.version>
<bson.codec.jsr310.version>3.5.4</bson.codec.jsr310.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.stereotype.Component
import java.io.ByteArrayOutputStream

import za.co.absa.spark.commons.utils.SchemaUtils
import za.co.absa.enceladus.rest_api.models.rest.exceptions.SchemaParsingException
import za.co.absa.enceladus.utils.schema.SchemaUtils

import scala.util.control.NonFatal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package za.co.absa.enceladus.rest_api.utils.converters

import org.apache.spark.sql.types._
import za.co.absa.enceladus.model._
import za.co.absa.enceladus.utils.testUtils.SparkTestBase
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.databind.SerializationFeature
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.enceladus.rest_api.models.rest.exceptions.SchemaParsingException
import za.co.absa.spark.commons.test.SparkTestBase

class SparkMenasSchemaConvertorSuite extends AnyFunSuite with SparkTestBase {
private val objectMapper = new ObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,13 @@ trait CommonJobExecution extends ProjectMetadata {
}

protected def addInfoColumns(intoDf: DataFrame, reportDate: String, reportVersion: Int): DataFrame = {
import za.co.absa.enceladus.utils.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
val function: (DataFrame, String) => DataFrame = (df: DataFrame, _) =>
df.withColumn("errCol", lit(Array.emptyIntArray))
intoDf
.withColumnIfDoesNotExist(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat))
.withColumnIfDoesNotExist(InfoDateColumnString, lit(reportDate))
.withColumnIfDoesNotExist(InfoVersionColumn, lit(reportVersion))
.withColumnIfDoesNotExist(function)(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat))
Copy link
Contributor

@dk1844 dk1844 Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is late, but I think I take issue with DataFrameImplicits.DataFrameEnhancements.withColumnIfDoesNotExist(ifExists: (DataFrame, String) => DataFrame)(colName: String, colExpr: Column) that originated in AbsaOSS/spark-commons#18, apparently.

IMHO the name of the method simply does not conform to what it does and what params it requires - the name suggests no-op column does not exist, but it allows an action in such a case, too. My vote is either to have two separate methods withColumnIfDoesNotExist and transformIfColumnExists or at least rename this method somehow to make sense.

Here, you can alleviate the problem only by renaming the function to something more descriptive like ifExistsFn or similar. (this occurs in multiple codebase files)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree. In minimum, the parameters order seems wrong, and could have a default value in the ifExists parameter.

.withColumnIfDoesNotExist(function)(InfoDateColumnString, lit(reportDate))
.withColumnIfDoesNotExist(function)(InfoVersionColumn, lit(reportVersion))
}

private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit hadoopConf: Configuration): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.enceladus.utils.config.PathWithFs
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.general.ProjectMetadata
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

object PerformanceMetricTools extends ProjectMetadata {

Expand Down Expand Up @@ -189,7 +189,7 @@ object PerformanceMetricTools extends ProjectMetadata {
* when running a Standardization or a Dynamic Conformance job. */
private def getNumberOfErrors(spark: SparkSession, outputPath: String): (Long, Long, Long) = {
val df = spark.read.parquet(outputPath)
val errorCountColumn = SchemaUtils.getClosestUniqueName("enceladus_error_count", df.schema)
val errorCountColumn = df.schema.getClosestUniqueName("enceladus_error_count")
val errCol = col(ErrorMessage.errorColumnName)
val numRecordsFailed = df.filter(size(errCol) > 0).count
val numRecordsSuccessful = df.filter(size(errCol) === 0).count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import za.co.absa.enceladus.utils.config.{ConfigReader, PathWithFs}
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.common.performance.PerformanceMetricTools
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -118,7 +118,7 @@ trait ConformanceExecution extends CommonJobExecution {
spark.setControlMeasurementError(sourceId.toString, e.getMessage, sw.toString)
throw e
case Success(conformedDF) =>
if (SchemaUtils.fieldExists(Constants.EnceladusRecordId, conformedDF.schema)) {
if (conformedDF.schema.fieldExists(Constants.EnceladusRecordId)) {
conformedDF // no new id regeneration
} else {
RecordIdGeneration.addRecordIdColumnByStrategy(conformedDF, Constants.EnceladusRecordId, recordIdGenerationStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class HyperConformance (menasBaseUrls: List[String],

def applyConformanceTransformations(rawDf: DataFrame, conformance: Dataset)
(implicit sparkSession: SparkSession, menasDAO: MenasDAO): DataFrame = {
import za.co.absa.enceladus.utils.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

val schema: StructType = menasDAO.getSchema(conformance.schemaName, conformance.schemaVersion)
val schemaFields = if (schema == null) List() else schema.fields.toList
Expand All @@ -78,11 +78,13 @@ class HyperConformance (menasBaseUrls: List[String],
// using HDFS implementation until HyperConformance is S3-ready
implicit val hdfs: FileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
implicit val hdfsUtils: HadoopFsUtils = HadoopFsUtils.getOrCreate(hdfs)
val function: (DataFrame, String) => DataFrame = (df: DataFrame, _) =>
dk1844 marked this conversation as resolved.
Show resolved Hide resolved
df.withColumn("errCol", lit(Array.emptyIntArray))

val conformedDf = DynamicInterpreter().interpret(conformance, rawDf)
.withColumnIfDoesNotExist(InfoDateColumn, coalesce(infoDateColumn, current_date()))
.withColumnIfDoesNotExist(InfoDateColumnString, coalesce(date_format(infoDateColumn,"yyyy-MM-dd"), lit("")))
.withColumnIfDoesNotExist(InfoVersionColumn, infoVersionColumn)
.withColumnIfDoesNotExist(function)(InfoDateColumn, coalesce(infoDateColumn, current_date()))
.withColumnIfDoesNotExist(function)(InfoDateColumnString, coalesce(date_format(infoDateColumn,"yyyy-MM-dd"), lit("")))
.withColumnIfDoesNotExist(function)(InfoVersionColumn, infoVersionColumn)
conformedDf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, _}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.config.PathWithFs
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.explode.ExplosionContext
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.general.Algorithms
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.enceladus.utils.udf.UDFLibrary
import za.co.absa.spark.commons.utils.explode.ExplosionContext
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays

case class DynamicInterpreter(implicit inputFs: FileSystem) {
private val log = LoggerFactory.getLogger(this.getClass)
Expand Down Expand Up @@ -72,7 +72,7 @@ case class DynamicInterpreter(implicit inputFs: FileSystem) {

private def findOriginalColumnsModificationRules(steps: List[ConformanceRule],
schema: StructType): Seq[ConformanceRule] = {
steps.filter(rule => SchemaUtils.fieldExists(rule.outputColumn, schema))
steps.filter(rule => schema.fieldExists(rule.outputColumn))
}

/**
Expand Down Expand Up @@ -182,7 +182,7 @@ case class DynamicInterpreter(implicit inputFs: FileSystem) {
if (isGroupExplosionUsable(rules) &&
ictx.featureSwitches.experimentalMappingRuleEnabled) {
// Inserting an explosion and a collapse between a group of mapping rules operating on a common array
val optArray = SchemaUtils.getDeepestArrayPath(schema, rules.head.outputColumn)
val optArray = schema.getDeepestArrayPath(rules.head.outputColumn)
optArray match {
case Some(arrayColumn) =>
new ArrayExplodeInterpreter(arrayColumn) :: (interpreters :+ new ArrayCollapseInterpreter())
Expand Down Expand Up @@ -402,7 +402,7 @@ case class DynamicInterpreter(implicit inputFs: FileSystem) {
*/
private def groupMappingRules(rules: List[ConformanceRule], schema: StructType): List[List[ConformanceRule]] = {
Algorithms.stableGroupByOption[ConformanceRule, String](rules, {
case m: MappingConformanceRule => SchemaUtils.getDeepestArrayPath(schema, m.outputColumn)
case m: MappingConformanceRule => schema.getDeepestArrayPath(m.outputColumn)
case _ => None
}).map(_.toList).toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package za.co.absa.enceladus.conformance.interpreter

import za.co.absa.enceladus.utils.explode.ExplosionContext
import za.co.absa.spark.commons.utils.explode.ExplosionContext

/**
* This class is used to encapsulate a state of exploded arrays during processing of dynamic conformance steps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

class OptimizerTimeTracker(inputDf: DataFrame, isWorkaroundEnabled: Boolean)(implicit spark: SparkSession) {
import spark.implicits._
Expand All @@ -31,7 +31,7 @@ class OptimizerTimeTracker(inputDf: DataFrame, isWorkaroundEnabled: Boolean)(imp
private var baselineTimeMs = initialElapsedTimeBaselineMs
private var lastExecutionPlanOptimizationTime = 0L

private val idField1 = SchemaUtils.getUniqueName("tmpId", Option(inputDf.schema))
private val idField1 = inputDf.schema.getClosestUniqueName("tmpId")
private val idField2 = s"${idField1}_2"
private val dfWithId = inputDf.withColumn(idField1, lit(1))
private val dfJustId = Seq(1).toDF(idField2).cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.explode.{ExplodeTools, ExplosionContext}
import za.co.absa.spark.commons.utils.ExplodeTools
import za.co.absa.spark.commons.utils.explode.ExplosionContext

/**
* This conformance interpreter collapses previously exploded array(s) back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.explode.ExplodeTools
import za.co.absa.spark.commons.utils.ExplodeTools

/**
* This conformance interpreter explodes a given array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import za.co.absa.spark.hats.Extensions._
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{CastingConformanceRule, ConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.enceladus.utils.udf.UDFNames
import za.co.absa.spark.commons.implicits.DataTypeImplicits.DataTypeEnhancements
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.hats.transformations.NestedArrayTransformations

case class CastingRuleInterpreter(rule: CastingConformanceRule) extends RuleInterpreter {
Expand All @@ -41,13 +42,13 @@ case class CastingRuleInterpreter(rule: CastingConformanceRule) extends RuleInte
RuleValidators.validateOutputField(progArgs.datasetName, ruleName, df.schema, rule.outputColumn)
RuleValidators.validateSameParent(progArgs.datasetName, ruleName, rule.inputColumn, rule.outputColumn)

SchemaUtils.getFieldType(rule.inputColumn, df.schema)
df.schema.getFieldType(rule.inputColumn)
.foreach(dt => RuleValidators.validateTypeCompatibility(ruleName, rule.inputColumn, dt, rule.outputDataType))

val sourceDataType = SchemaUtils.getFieldType(rule.inputColumn, df.schema).get
val sourceDataType = df.schema.getFieldType(rule.inputColumn).get
val targetDataType = CatalystSqlParser.parseDataType(rule.outputDataType)

if (SchemaUtils.isCastAlwaysSucceeds(sourceDataType, targetDataType)) {
if (sourceDataType.doesCastAlwaysSucceed(targetDataType)) {
// Casting to string does not generate errors
df.nestedMapColumn(rule.inputColumn, rule.outputColumn, c =>
c.cast(rule.outputDataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import za.co.absa.spark.hats.Extensions._
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, DropConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

case class DropRuleInterpreter(rule: DropConformanceRule) extends RuleInterpreter {

Expand All @@ -29,7 +29,7 @@ case class DropRuleInterpreter(rule: DropConformanceRule) extends RuleInterprete
def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO,
progArgs: InterpreterContextArgs): Dataset[Row] = {
if (SchemaUtils.fieldExists(rule.outputColumn, df.schema)) {
if (df.schema.fieldExists(rule.outputColumn)) {
if (rule.outputColumn.contains('.')) {
conformNestedField(df)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, FillNullsConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.hats.Extensions._

import scala.util.{Failure, Success}
Expand All @@ -45,7 +45,7 @@ case class FillNullsRuleInterpreter(rule: FillNullsConformanceRule) extends Rule
rule.outputColumn
)

val dataType: DataType = SchemaUtils.getFieldType(rule.inputColumn, df.schema).get
val dataType: DataType = df.schema.getFieldType(rule.inputColumn).get
val default: Column = simpleLiteralCast(rule.value, dataType) match {
case Success(value) => value
case Failure(exception) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import za.co.absa.spark.hats.Extensions._
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, NegationConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.enceladus.utils.types.GlobalDefaults
import za.co.absa.enceladus.utils.udf.UDFNames
import za.co.absa.enceladus.utils.validation.SchemaPathValidator
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.hats.transformations.NestedArrayTransformations

case class NegationRuleInterpreter(rule: NegationConformanceRule) extends RuleInterpreter {
Expand All @@ -37,7 +37,7 @@ case class NegationRuleInterpreter(rule: NegationConformanceRule) extends RuleIn
progArgs: InterpreterContextArgs): Dataset[Row] = {
NegationRuleInterpreter.validateInputField(progArgs.datasetName, df.schema, rule.inputColumn)

val field = SchemaUtils.getField(rule.inputColumn, df.schema).get
val field = df.schema.getField(rule.inputColumn).get

val negationErrUdfCall = callUDF(UDFNames.confNegErr, lit(rule.outputColumn), col(rule.inputColumn))
val errCol = "errCol"
Expand Down
Loading