Skip to content

Commit

Permalink
#1461 FillNulls now uses simple literal cast
Browse files Browse the repository at this point in the history
  • Loading branch information
Zejnilovic authored Jul 29, 2020
1 parent 8925e71 commit a26b3f9
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 55 deletions.
2 changes: 1 addition & 1 deletion dao/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion data-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion menas/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion migrations-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion migrations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion plugins-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion plugins-builtin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
<packaging>pom</packaging>

<name>Enceladus</name>
Expand Down
2 changes: 1 addition & 1 deletion spark-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.10.0</version>
<version>2.10.1</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.conformance.interpreter.exceptions

import org.apache.spark.sql.types.DataType

case class InvalidDataTypeException(input: String, dataType: DataType) extends Exception(
s"Data type ${dataType.typeName} is not valid or not supported"
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@

package za.co.absa.enceladus.conformance.interpreter.rules

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, RuleValidators}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, FillNullsConformanceRule}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.spark.hats.Extensions._
import org.apache.spark.sql.functions._
import za.co.absa.enceladus.conformance.config.ConformanceConfig

import scala.util.{Failure, Success}

object FillNullsRuleInterpreter {
final val ruleName = "Fill Nulls Rule"
Expand All @@ -41,22 +45,28 @@ case class FillNullsRuleInterpreter(rule: FillNullsConformanceRule) extends Rule
rule.outputColumn
)

val dataType: DataType = SchemaUtils.getFieldType(rule.inputColumn, df.schema).get
val default: Column = simpleLiteralCast(rule.value, dataType) match {
case Success(value) => value
case Failure(exception) =>
throw new ValidationException(
s"""Unable to cast literal ${rule.value} to $dataType
| for FillNulls conformance rule number ${rule.order}.""".stripMargin.replaceAll("[\\r\\n]", ""),
cause = exception)
}

if (rule.outputColumn.contains('.')) {
conformNestedField(df)
conformNestedField(df, default)
} else {
conformRootField(df)
conformRootField(df, default)
}
}

private def conformNestedField(df: Dataset[Row])(implicit spark: SparkSession): Dataset[Row] = {
df.nestedWithColumnExtended(rule.outputColumn, getField => when(
getField(rule.inputColumn).isNull, inferStrictestType(rule.value)
).otherwise(getField(rule.inputColumn)))
private def conformNestedField(df: Dataset[Row], default: Column)(implicit spark: SparkSession): Dataset[Row] = {
df.nestedWithColumnExtended(rule.outputColumn, getField => coalesce(getField(rule.inputColumn), default))
}

private def conformRootField(df: Dataset[Row])(implicit spark: SparkSession): Dataset[Row] = {
df.withColumn(rule.outputColumn, when(
col(rule.inputColumn).isNull, inferStrictestType(rule.value)
).otherwise(col(rule.inputColumn)))
private def conformRootField(df: Dataset[Row], default: Column)(implicit spark: SparkSession): Dataset[Row] = {
df.withColumn(rule.outputColumn, coalesce(col(rule.inputColumn), default))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@

package za.co.absa.enceladus.conformance.interpreter.rules

import java.math.BigDecimal
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.exceptions.InvalidDataTypeException
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.transformations.ArrayTransformations
Expand All @@ -28,7 +33,6 @@ import scala.util.Try

trait RuleInterpreter {


/**
* Returns the conformance rule the interpreter is intended to interpret.
* The return value is optional since some interpreters are generated during conformance rules processing optimization
Expand Down Expand Up @@ -76,6 +80,42 @@ trait RuleInterpreter {
(intTry orElse longTry orElse doubleTry orElse boolTry) getOrElse lit(input)
}

/**
* Preforms a simple type cast to input base on the DataType
* @param input Value to be casted
* @param dataType DataType of the value to be casted to
* @return Returns Column representation of the newly casted value
*/
def simpleLiteralCast(input: String, dataType: DataType): Try[Column] = {
Try({
dataType match {
case _: ByteType =>
lit(input.toByte)
case _: ShortType =>
lit(input.toShort)
case _: IntegerType =>
lit(input.toInt)
case _: LongType =>
lit(input.toLong)
case _: FloatType =>
lit(input.toFloat)
case _: DoubleType =>
lit(input.toDouble)
case _: BooleanType =>
lit(input.toBoolean)
case _: DecimalType =>
lit(new BigDecimal(input))
case _: TimestampType =>
lit(Timestamp.valueOf(input))
case _: DateType =>
lit(Date.valueOf(input))
case _: StringType =>
lit(input)
case _ => throw InvalidDataTypeException(input, dataType)
}
})
}

/**
* Helper function to handle arrays. If there's an array within the path of targetColumn, this helper will apply arrayTransform. When flat, it will apply the specified fn.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@

package za.co.absa.enceladus.conformance.interpreter.rules

class ValidationException(val message: String, val techDetails: String = "") extends Exception(message)
class ValidationException(val message: String,
val techDetails: String = "",
cause: Throwable = None.orNull) extends Exception(message, cause)

Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ class CoalesceRuleSuite extends FunSuite with SparkTestBase with TestRuleBehavio
val inputDf: DataFrame = spark.createDataFrame(DeepArraySamples.ordersDataWithNulls)

val expected =
"""{"id":1,"name":"First Order","items":[{"itemid":"ar229","qty":10,"price":5.1,"payments":[{"payid":"pid10","amount":51.0}],"itemid2":"ar229"},{"itemid":"2891k","qty":100,"price":1.1,"payments":[{"payid":"zk20","amount":100.0}],"itemid2":"2891k"},{"itemid":"31239","qty":2,"price":55.2,"payments":[],"itemid2":"31239"}],"errCol":[]}
|{"id":2,"items":[{"itemid":"AkuYdg","qty":100,"price":10.0,"payments":[{"payid":"d101","amount":10.0},{"payid":"d102","amount":20.0}],"itemid2":"AkuYdg"},{"itemid":"jUa1k0","qty":2,"price":55.2,"payments":[],"itemid2":"jUa1k0"}],"errCol":[]}
"""{"id":1,"name":"First Order","date":"2025-11-15","items":[{"itemid":"ar229","qty":10,"price":5.1,"payments":[{"payid":"pid10","amount":51.0}],"itemid2":"ar229"},{"itemid":"2891k","qty":100,"price":1.1,"payments":[{"payid":"zk20","amount":100.0}],"itemid2":"2891k"},{"itemid":"31239","qty":2,"price":55.2,"payments":[],"itemid2":"31239"}],"errCol":[]}
|{"id":2,"date":"2019-03-12","items":[{"itemid":"AkuYdg","qty":100,"price":10.0,"payments":[{"payid":"d101","amount":10.0},{"payid":"d102","amount":20.0}],"itemid2":"AkuYdg"},{"itemid":"jUa1k0","qty":2,"price":55.2,"payments":[],"itemid2":"jUa1k0"}],"errCol":[]}
|{"id":3,"name":"Third Order","items":[{"qty":10,"price":10000.0,"payments":[{"payid":"pid10","amount":2000.0},{"payid":"pid10","amount":5000.0}],"itemid2":"coalesce fill-in value"},{"itemid":"Jdha2","qty":100,"price":45.0,"payments":[{"payid":"zk20","amount":150.0},{"payid":"pid10","amount":2000.0}],"itemid2":"Jdha2"}],"errCol":[]}
|{"id":4,"name":"Fourth Order","items":[{"itemid":"dLda1","qty":10,"price":5.1,"payments":[{"payid":"pid10","amount":10.0}],"itemid2":"dLda1"},{"itemid":"d2dhJ","qty":100,"price":1.1,"payments":[{"payid":"zk20","amount":15.0}],"itemid2":"d2dhJ"},{"itemid":"Mska0","qty":2,"price":55.2,"payments":[],"itemid2":"Mska0"},{"itemid":"Gdal1","qty":20,"price":5.2,"payments":[],"itemid2":"Gdal1"},{"itemid":"dakl1","qty":99,"price":1.2,"payments":[],"itemid2":"dakl1"}],"errCol":[]}
|{"id":5,"name":"Fifths order","items":[{"itemid":"hdUs1J","qty":50,"price":0.2,"payments":[{"payid":"pid10","amount":10.0},{"payid":"pid10","amount":11.0},{"payid":"pid10","amount":12.0}],"itemid2":"hdUs1J"}],"errCol":[]}"""
|{"id":4,"name":"Fourth Order","date":"2005-01-02","items":[{"itemid":"dLda1","qty":10,"price":5.1,"payments":[{"payid":"pid10","amount":10.0}],"itemid2":"dLda1"},{"itemid":"d2dhJ","qty":100,"price":1.1,"payments":[{"payid":"zk20","amount":15.0}],"itemid2":"d2dhJ"},{"itemid":"Mska0","qty":2,"price":55.2,"payments":[],"itemid2":"Mska0"},{"itemid":"Gdal1","qty":20,"price":5.2,"payments":[],"itemid2":"Gdal1"},{"itemid":"dakl1","qty":99,"price":1.2,"payments":[],"itemid2":"dakl1"}],"errCol":[]}
|{"id":5,"name":"Fifths order","date":"2009-05-21","items":[{"itemid":"hdUs1J","qty":50,"price":0.2,"payments":[{"payid":"pid10","amount":10.0},{"payid":"pid10","amount":11.0},{"payid":"pid10","amount":12.0}],"itemid2":"hdUs1J"}],"errCol":[]}"""
.stripMargin.replace("\r\n", "\n")

conformanceRuleShouldMatchExpected(inputDf, ordersDS, expected)
Expand Down
Loading

0 comments on commit a26b3f9

Please sign in to comment.