diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/CommonJobExecution.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/CommonJobExecution.scala index 61dc9ccd0..89f34c937 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/common/CommonJobExecution.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/common/CommonJobExecution.scala @@ -327,12 +327,12 @@ trait CommonJobExecution extends ProjectMetadata { protected def addInfoColumns(intoDf: DataFrame, reportDate: String, reportVersion: Int): DataFrame = { import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements - val function: (DataFrame, String) => DataFrame = (df: DataFrame, _) => + val ifExistsFunc: (DataFrame, String) => DataFrame = (df: DataFrame, _) => df.withColumn("errCol", lit(Array.emptyIntArray)) intoDf - .withColumnIfDoesNotExist(function)(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat)) - .withColumnIfDoesNotExist(function)(InfoDateColumnString, lit(reportDate)) - .withColumnIfDoesNotExist(function)(InfoVersionColumn, lit(reportVersion)) + .withColumnIfDoesNotExist(ifExistsFunc)(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat)) + .withColumnIfDoesNotExist(ifExistsFunc)(InfoDateColumnString, lit(reportDate)) + .withColumnIfDoesNotExist(ifExistsFunc)(InfoVersionColumn, lit(reportVersion)) } private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit hadoopConf: Configuration): Int = { diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/TypeParser.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/TypeParser.scala index bccaa68cb..cfabcb863 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/TypeParser.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/TypeParser.scala @@ -38,6 +38,7 @@ import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.spark.commons.utils.SchemaUtils import za.co.absa.spark.hofs.transform +import za.co.absa.enceladus.utils.schema.{SchemaUtils => EnceladusSchemautils} import scala.reflect.runtime.universe._ import scala.util.{Random, Try} @@ -197,7 +198,6 @@ object TypeParser { logger.info(s"Creating standardization plan for Array $inputFullPathName") val origArrayType = origType.asInstanceOf[ArrayType] // this should never throw an exception because of `checkSetupForFailure` val arrayField = StructField(fieldInputName, fieldType.elementType, fieldType.containsNull, field.structField.metadata) - import za.co.absa.enceladus.utils.schema.{SchemaUtil => EnceladusSchemautils} // import rename val lambdaVariableName = s"${EnceladusSchemautils.unpath(inputFullPathName)}_${Random.nextLong().abs}" val lambda = (forCol: Column) => TypeParser(arrayField, path, forCol, origArrayType.elementType, failOnInputNotPerSchema, isArrayElement = true) .standardize()