From 603f3ac6b4fe4ea49526bc063dfa82f2eb30f8c8 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Wed, 12 May 2021 10:00:12 +0200 Subject: [PATCH] #417 SparkXML-related unit test added first (regression guard), SparkXMLHack removed, the test holds. --- .../StandardizationInterpreter.scala | 13 +-- .../interpreter/stages/SparkXMLHack.scala | 45 ---------- .../data/standardization_xml_suite_data.txt | 4 + .../StandardizationXmlSuite.scala | 84 +++++++++++++++++++ 4 files changed, 90 insertions(+), 56 deletions(-) delete mode 100644 spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/SparkXMLHack.scala create mode 100644 spark-jobs/src/test/resources/data/standardization_xml_suite_data.txt create mode 100644 spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationXmlSuite.scala diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/StandardizationInterpreter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/StandardizationInterpreter.scala index a38ded1b9..941788920 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/StandardizationInterpreter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/StandardizationInterpreter.scala @@ -22,7 +22,7 @@ import org.slf4j.{Logger, LoggerFactory} import za.co.absa.enceladus.common.{Constants, RecordIdGeneration} import za.co.absa.enceladus.common.RecordIdGeneration._ import za.co.absa.enceladus.standardization.interpreter.dataTypes._ -import za.co.absa.enceladus.standardization.interpreter.stages.{SchemaChecker, SparkXMLHack, TypeParser} +import za.co.absa.enceladus.standardization.interpreter.stages.{SchemaChecker, TypeParser} import za.co.absa.enceladus.utils.error.ErrorMessage import za.co.absa.enceladus.utils.schema.{SchemaUtils, SparkUtils} import za.co.absa.enceladus.utils.transformations.ArrayTransformations @@ -54,17 +54,8 @@ object StandardizationInterpreter { logger.info(s"Step 1: Schema validation") validateSchemaAgainstSelfInconsistencies(expSchema) - // TODO: remove when spark-xml handles empty arrays #417 - val dfXmlSafe: Dataset[Row] = if (inputType.toLowerCase() == "xml") { - df.select(expSchema.fields.map { field: StructField => - SparkXMLHack.hack(field, "", df).as(field.name) - }: _*) - } else { - df - } - logger.info(s"Step 2: Standardization") - val std = standardizeDataset(dfXmlSafe, expSchema, failOnInputNotPerSchema) + val std = standardizeDataset(df, expSchema, failOnInputNotPerSchema) logger.info(s"Step 3: Clean the final error column") val cleanedStd = cleanTheFinalErrorColumn(std) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/SparkXMLHack.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/SparkXMLHack.scala deleted file mode 100644 index 682e8235f..000000000 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/interpreter/stages/SparkXMLHack.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.enceladus.standardization.interpreter.stages - -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} -import za.co.absa.enceladus.utils.schema.SchemaUtils.appendPath -import za.co.absa.enceladus.utils.transformations.ArrayTransformations.arrCol -import za.co.absa.enceladus.utils.udf.UDFLibrary - -/** - * Hack around spark-xml bug: Null arrays produce array(null) instead of null. - * - * Get rid of this as soon as this is fixed in spark-xml - */ - -object SparkXMLHack { - - def hack(field: StructField, path: String, df: Dataset[Row])(implicit spark: SparkSession, udfLib: UDFLibrary): Column = { - val currentAttrPath = appendPath(path, field.name) - - field.dataType match { - case a @ ArrayType(elType, nullable) => - when((size(arrCol(currentAttrPath)) === 1) and arrCol(currentAttrPath)(0).isNull, lit(null)).otherwise(arrCol(currentAttrPath)) as field.name // scalastyle:ignore null - case t: StructType => - struct(t.fields.toSeq.map(x => hack(x, currentAttrPath, df)): _*) as field.name - case _ => - arrCol(currentAttrPath) as field.name - } - } -} diff --git a/spark-jobs/src/test/resources/data/standardization_xml_suite_data.txt b/spark-jobs/src/test/resources/data/standardization_xml_suite_data.txt new file mode 100644 index 000000000..8facae313 --- /dev/null +++ b/spark-jobs/src/test/resources/data/standardization_xml_suite_data.txt @@ -0,0 +1,4 @@ +2018-08-1011000 +2018-08-1022000 +2018-08-103 +2018-08-104 diff --git a/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationXmlSuite.scala b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationXmlSuite.scala new file mode 100644 index 000000000..7b2969020 --- /dev/null +++ b/spark-jobs/src/test/scala/za/co/absa/enceladus/standardization/StandardizationXmlSuite.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.standardization + +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types +import org.apache.spark.sql.types._ +import org.mockito.scalatest.MockitoSugar +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.enceladus.dao.MenasDAO +import za.co.absa.enceladus.model.Dataset +import za.co.absa.enceladus.standardization.config.StandardizationConfig +import za.co.absa.enceladus.standardization.interpreter.StandardizationInterpreter +import za.co.absa.enceladus.standardization.interpreter.stages.PlainSchemaGenerator +import za.co.absa.enceladus.utils.implicits.DataFrameImplicits.DataFrameEnhancements +import za.co.absa.enceladus.utils.testUtils.SparkTestBase +import za.co.absa.enceladus.utils.udf.UDFLibrary + +class StandardizationXmlSuite extends AnyFunSuite with SparkTestBase with MockitoSugar{ + private implicit val udfLibrary:UDFLibrary = new UDFLibrary() + + private val standardizationReader = new StandardizationPropertiesProvider() + + test("Reading data from XML input") { + + implicit val dao: MenasDAO = mock[MenasDAO] + + val args = ("--dataset-name Foo --dataset-version 1 --report-date 2018-08-10 --report-version 1 " + + "--menas-auth-keytab src/test/resources/user.keytab.example " + + "--raw-format xml --row-tag instrument").split(" ") + + val dataSet = Dataset("SpecialChars", 1, None, "", "", "SpecialChars", 1, conformance = Nil) + val cmd = StandardizationConfig.getFromArguments(args) + + val csvReader = standardizationReader.getFormatSpecificReader(cmd, dataSet) + + val baseSchema = StructType(Array( + StructField("rowId", LongType), + StructField("reportDate", StringType), + StructField("legs", types.ArrayType(StructType(Array( + StructField("leg", StructType(Array( + StructField("price", IntegerType) + ))) + )))) + )) + val inputSchema = PlainSchemaGenerator.generateInputSchema(baseSchema, Option("_corrupt_record")) + val reader = csvReader.schema(inputSchema) + + val sourceDF = reader.load("src/test/resources/data/standardization_xml_suite_data.txt") + // not expecting corrupted records, but checking to be sure + val corruptedRecords = sourceDF.filter(col("_corrupt_record").isNotNull) + assert(corruptedRecords.isEmpty, s"Unexpected corrupted records found: ${corruptedRecords.collectAsList()}") + + val destDF = StandardizationInterpreter.standardize(sourceDF, baseSchema, cmd.rawFormat) + + val actual = destDF.dataAsString(truncate = false) + val expected = + """+-----+----------+----------+------+ + ||rowId|reportDate|legs |errCol| + |+-----+----------+----------+------+ + ||1 |2018-08-10|[[[1000]]]|[] | + ||2 |2018-08-10|[[[2000]]]|[] | + ||3 |2018-08-10|[[[]]] |[] | + ||4 |2018-08-10|null |[] | + |+-----+----------+----------+------+ + | + |""".stripMargin.replace("\r\n", "\n") + + assert(actual == expected) + } +}