Skip to content

Commit

Permalink
#2022 feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrianOlosutean committed Mar 8, 2022
1 parent 3b34913 commit ca144ec
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import za.co.absa.enceladus.utils.fs.{FileSystemUtils, HadoopFsUtils}
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.modules.SourcePhase.Standardization
import za.co.absa.enceladus.common.performance.PerformanceMeasurer
import za.co.absa.enceladus.utils.schema.SparkUtils.ifExistsErrorFunction
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer
import za.co.absa.enceladus.utils.validation.ValidationLevel
import scala.util.control.NonFatal
Expand Down Expand Up @@ -324,12 +323,11 @@ trait CommonJobExecution extends ProjectMetadata {
}

protected def addInfoColumns(intoDf: DataFrame, reportDate: String, reportVersion: Int): DataFrame = {
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
val dateLitWithFormat = to_date(lit(reportDate), ReportDateFormat)
import za.co.absa.enceladus.utils.schema.SparkUtils.DataFrameWithEnhancements
intoDf
.withColumnIfDoesNotExist(ifExistsErrorFunction(dateLitWithFormat))(InfoDateColumn, dateLitWithFormat)
.withColumnIfDoesNotExist(ifExistsErrorFunction(lit(reportDate)))(InfoDateColumnString, lit(reportDate))
.withColumnIfDoesNotExist(ifExistsErrorFunction(lit(reportVersion)))(InfoVersionColumn, lit(reportVersion))
.withColumnOverwriteIfExists(InfoDateColumn, to_date(lit(reportDate), ReportDateFormat))
.withColumnOverwriteIfExists(InfoDateColumnString, lit(reportDate))
.withColumnOverwriteIfExists(InfoVersionColumn, lit(reportVersion))
}

private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit hadoopConf: Configuration): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import za.co.absa.enceladus.dao.rest.RestDaoFactory.AvailabilitySetup
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.model.{ConformedSchema, Dataset}
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.schema.SparkUtils.ifExistsErrorFunction
import za.co.absa.enceladus.utils.validation.ValidationLevel
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}

Expand Down Expand Up @@ -82,10 +81,11 @@ class HyperConformance (menasBaseUrls: List[String],
implicit val hdfsUtils: HadoopFsUtils = HadoopFsUtils.getOrCreate(hdfs)
val dataFormat = coalesce(date_format(infoDateColumn, "yyyy-MM-dd"), lit(""))
val currentDateColumn = current_date()
import za.co.absa.enceladus.utils.schema.SparkUtils.DataFrameWithEnhancements
val conformedDf = DynamicInterpreter().interpret(conformance, rawDf)
.withColumnIfDoesNotExist(ifExistsErrorFunction(currentDateColumn))(InfoDateColumn, coalesce(infoDateColumn, currentDateColumn))
.withColumnIfDoesNotExist(ifExistsErrorFunction(dataFormat))(InfoDateColumnString, dataFormat)
.withColumnIfDoesNotExist(ifExistsErrorFunction(infoVersionColumn))(InfoVersionColumn, infoVersionColumn)
.withColumnOverwriteIfExists(InfoDateColumn, coalesce(infoDateColumn, currentDateColumn))
.withColumnOverwriteIfExists(InfoDateColumnString, dataFormat)
.withColumnOverwriteIfExists(InfoVersionColumn, infoVersionColumn)
conformedDf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.udf.UDFLibrary
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.hats.transformations.NestedArrayTransformations


Expand Down Expand Up @@ -86,7 +87,11 @@ object SparkUtils {
dfWithAggregatedErrColumn.drop(tmpColumn)
}

def ifExistsErrorFunction(colExpr: Column): (DataFrame, String) => DataFrame =
(df: DataFrame, colName: String) => overwriteWithErrorColumn(df, colName, colExpr)
implicit class DataFrameWithEnhancements(val df: DataFrame) {
def withColumnOverwriteIfExists(colName: String, colExpr: Column): DataFrame = {
val overwrite: (DataFrame, String) => DataFrame = overwriteWithErrorColumn(_, _, colExpr)
df.withColumnIfDoesNotExist(overwrite)(colName, colExpr)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.enceladus.utils.schema.SparkUtils.DataFrameWithEnhancements

class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {

Expand All @@ -46,7 +47,7 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {
}

private val colExpression: Column = lit(1)
test("Test withColumnIfNotExist() when the column does not exist") {
test("Test withColumnOverwriteIfExists() when the column does not exist") {
val expectedOutput =
"""+-----+---+
||value|foo|
Expand All @@ -62,7 +63,7 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {

val dfIn = getDummyDataFrame

val dfOut = dfIn.withColumnIfDoesNotExist(SparkUtils.ifExistsErrorFunction(colExpression))("foo", colExpression)
val dfOut = dfIn.withColumnOverwriteIfExists("foo", colExpression)
val actualOutput = dfOut.dataAsString(truncate = false)

assert(dfOut.schema.length == 2)
Expand All @@ -71,7 +72,7 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {
assert(actualOutput == expectedOutput)
}

test("Test withColumnIfNotExist() when the column exists") {
test("Test withColumnOverwriteIfExists() when the column exists") {
val expectedOutput =
"""+-----+----------------------------------------------------------------------------------------------+
||value|errCol |
Expand All @@ -86,15 +87,15 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {
|""".stripMargin.replace("\r\n", "\n")

val dfIn = getDummyDataFrame
val dfOut = dfIn.withColumnIfDoesNotExist(SparkUtils.ifExistsErrorFunction(colExpression))("value", colExpression)
val dfOut = dfIn.withColumnOverwriteIfExists("value", colExpression)
val actualOutput = dfOut.dataAsString(truncate = false)

assert(dfIn.schema.length == 1)
assert(dfIn.schema.head.name == "value")
assert(actualOutput == expectedOutput)
}

test("Test withColumnIfNotExist() when the column exists, but has a different case") {
test("Test withColumnOverwriteIfExists() when the column exists, but has a different case") {
val expectedOutput =
"""+-----+----------------------------------------------------------------------------------------------+
||vAlUe|errCol |
Expand All @@ -109,7 +110,7 @@ class SparkUtilsSuite extends AnyFunSuite with SparkTestBase {
|""".stripMargin.replace("\r\n", "\n")

val dfIn = getDummyDataFrame
val dfOut = dfIn.withColumnIfDoesNotExist(SparkUtils.ifExistsErrorFunction(colExpression))("vAlUe", colExpression)
val dfOut = dfIn.withColumnOverwriteIfExists("vAlUe", colExpression)
val actualOutput = dfOut.dataAsString(truncate = false)

assert(dfIn.schema.length == 1)
Expand Down

0 comments on commit ca144ec

Please sign in to comment.