Skip to content

Commit

Permalink
#2022 feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrianOlosutean committed Feb 26, 2022
1 parent 7738468 commit 85b452b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ trait CommonJobExecution extends ProjectMetadata {

protected def addInfoColumns(intoDf: DataFrame, reportDate: String, reportVersion: Int): DataFrame = {
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
val function: (DataFrame, String) => DataFrame = (df: DataFrame, _) =>
val ifExistsFunc: (DataFrame, String) => DataFrame = (df: DataFrame, _) =>
df.withColumn("errCol", lit(Array.emptyIntArray))
intoDf
.withColumnIfDoesNotExist(function)(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat))
.withColumnIfDoesNotExist(function)(InfoDateColumnString, lit(reportDate))
.withColumnIfDoesNotExist(function)(InfoVersionColumn, lit(reportVersion))
.withColumnIfDoesNotExist(ifExistsFunc)(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat))
.withColumnIfDoesNotExist(ifExistsFunc)(InfoDateColumnString, lit(reportDate))
.withColumnIfDoesNotExist(ifExistsFunc)(InfoVersionColumn, lit(reportVersion))
}

private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit hadoopConf: Configuration): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.utils.SchemaUtils
import za.co.absa.spark.hofs.transform
import za.co.absa.enceladus.utils.schema.{SchemaUtils => EnceladusSchemautils}

import scala.reflect.runtime.universe._
import scala.util.{Random, Try}
Expand Down Expand Up @@ -197,7 +198,6 @@ object TypeParser {
logger.info(s"Creating standardization plan for Array $inputFullPathName")
val origArrayType = origType.asInstanceOf[ArrayType] // this should never throw an exception because of `checkSetupForFailure`
val arrayField = StructField(fieldInputName, fieldType.elementType, fieldType.containsNull, field.structField.metadata)
import za.co.absa.enceladus.utils.schema.{SchemaUtil => EnceladusSchemautils} // import rename
val lambdaVariableName = s"${EnceladusSchemautils.unpath(inputFullPathName)}_${Random.nextLong().abs}"
val lambda = (forCol: Column) => TypeParser(arrayField, path, forCol, origArrayType.elementType, failOnInputNotPerSchema, isArrayElement = true)
.standardize()
Expand Down

0 comments on commit 85b452b

Please sign in to comment.