diff --git a/.gitignore b/.gitignore index decf5d1..5889d55 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ metastore_db/ # Scala-IDE specific .scala_dependencies .worksheet +/.bsp/sbt.json diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..2710165 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,46 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [6.0.0] - 2020-05 +### Migrated +- Spark 3.0 Migration + * Migrate to Spark version 3.0.1, Hadoop 3.2.1 and Scala 2.12 + * Spark 3 uses the Proleptic Gregorian calendar. + In case there are problems when data sources have dates before 1582 or other problematics formats, as a quick fix we can set the + following spark parameters in the pipelines: + ``` + "spark.sql.legacy.timeParserPolicy": "LEGACY", "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY", "spark.sql.legacy.parquet.datetimeRebaseModeInRead": "LEGACY" + ``` + An example of an exception related to parsing dates and timestamps looks like this: + ``` + SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '00/00/0000' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string. + ``` + Note 1: there's also two other exceptions that we observed related to reading or writing Parquets with old date/time formats. + They look very similar to the Spark upgrade exception above, but highlight the need to change the respective spark.sql.legacy.parquet.datetimeRebaseModeInXXXXX property. + Note 2: the solution provided above should cover all the exceptions enumerated here for a given data source. + +## [5.8.0] - 2020-04 +### Added +- Fix reconciliation execution time by removing unneeded caching stage. + +## [5.7.5] - 2020-04 +### Added +- Enable multi-line option for append loads +- fix duplicate issues generated by the latest changes applied to CompetitorDataPreprocessor + +### [5.7.2] - 2021-02 +#### Added +- Make init condensation optional, but true by default. + +### [5.7.1] - 2020-02 +#### Added +- Modify append load to support more complex partitioning strategies without file_regex +- Added support for configuring write load mode and num output files in append load +- Support for specifying the quote and escape characters. More info on how to specify those here: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html + +### [5.7.0] - 2020-01 +#### Added +- Support for multiple partition attributes (non date-derived) and single non date-derived partition attributes. \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 1fee3de..8de71b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM amazonlinux:2.0.20200722.0 +FROM amazonlinux:2.0.20210326.0 ARG JAVA_VERSION=1.8.0 @@ -17,4 +17,4 @@ RUN groupadd -r m3d && \ chown m3d:m3d /home/m3d USER m3d -CMD ["/bin/bash"] +CMD ["/bin/bash"] \ No newline at end of file diff --git a/README.md b/README.md index 5debf9a..f02c56b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ M3D Engine ======= -![M3D logo](/static/images/m3d_logo.png) +![M3DLogo](static/images/m3d_logo.png) **M3D** stands for _Metadata Driven Development_ and is a cloud and platform agnostic framework for the automated creation, management and governance of metadata and data flows from multiple source to multiple target systems. The main features and design goals of M3D are: diff --git a/build.sbt b/build.sbt index 189aaf4..cc690de 100644 --- a/build.sbt +++ b/build.sbt @@ -1,19 +1,20 @@ import sbt.ExclusionRule name := "m3d-engine" -version := "1.0" +version := "6.0.0" -scalaVersion := "2.11.12" +scalaVersion := "2.12.13" +addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.4.11" cross CrossVersion.full) +scalacOptions += "-Yrangepos" semanticdbEnabled := true -semanticdbVersion := scalafixSemanticdb.revision scalacOptions += "-Ywarn-unused-import" -val sparkVersion = "2.4.4" -val hadoopVersion = "2.8.5" +val sparkVersion = "3.0.1" +val hadoopVersion = "3.2.1" conflictManager := sbt.ConflictManager.latestRevision -mainClass in Compile := Some("com.adidas.analytics.AlgorithmFactory") +Compile / mainClass := Some("com.adidas.analytics.AlgorithmFactory") /* ===================== * Dependencies @@ -32,21 +33,23 @@ libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion % Provi libraryDependencies += "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided withExclusions Vector( ExclusionRule("io.netty", "netty-all") ) + libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Provided +libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs-client" % hadoopVersion % Provided libraryDependencies += "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion % Provided -libraryDependencies += "joda-time" % "joda-time" % "2.9.3" % Provided +libraryDependencies += "joda-time" % "joda-time" % "2.10.10" % Provided libraryDependencies += "org.joda" % "joda-convert" % "2.2.1" libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.30" -libraryDependencies += "io.delta" %% "delta-core" % "0.6.1" +libraryDependencies += "io.delta" %% "delta-core" % "0.8.0" /* ===================== * Dependencies for test * ===================== */ -libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.1" % Test +libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.7" % Test libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Test classifier "tests" withExclusions Vector( @@ -57,10 +60,10 @@ libraryDependencies += ExclusionRule("io.netty", "netty-all") ) -fork in Test := true +Test / fork := true // disable parallel execution -parallelExecution in Test := false +Test / parallelExecution := false // skipping tests when running assembly -test in assembly := {} +assembly / test := {} diff --git a/common.sh b/common.sh index 915c71f..771546d 100644 --- a/common.sh +++ b/common.sh @@ -1,5 +1,4 @@ -#!/bin/bash - +#!/bin/sh function array_contains() { local LOCAL_NEEDLE=$1 shift diff --git a/dev-env.sh b/dev-env.sh index 8c6f3ff..ac79e03 100755 --- a/dev-env.sh +++ b/dev-env.sh @@ -1,5 +1,4 @@ -#!/bin/bash - +#!/bin/sh set -e SCRIPT_NAME="dev-env.sh" @@ -15,6 +14,7 @@ OPTION_INTERACTIVE=("interactive" "i" "use interactive mode and allocate pseudo- ARG_ACTION_IMAGE_BUILD=("image-build" "build the docker image") ARG_ACTION_CONTAINER_RUN=("container-run" "run a container from the docker image") ARG_ACTION_CONTAINER_EXECUTE=("container-execute" "execute an external command within the container") +ARG_ACTION_CONTAINER_START=("container-start" "start the container") ARG_ACTION_CONTAINER_STOP=("container-stop" "stop the container") ARG_ACTION_CONTAINER_DELETE=("container-delete" "delete the container") ARG_ACTION_PROJECT_FORMAT=("project-format" "format the code") @@ -28,6 +28,7 @@ AVAILABLE_ACTIONS=( "$ARG_ACTION_IMAGE_BUILD" "$ARG_ACTION_CONTAINER_RUN" "$ARG_ACTION_CONTAINER_EXECUTE" + "$ARG_ACTION_CONTAINER_START" "$ARG_ACTION_CONTAINER_STOP" "$ARG_ACTION_CONTAINER_DELETE" "$ARG_ACTION_PROJECT_FORMAT" @@ -47,6 +48,7 @@ function create_actions_help_string() { "ARG_ACTION_IMAGE_BUILD" \ "ARG_ACTION_CONTAINER_RUN" \ "ARG_ACTION_CONTAINER_EXECUTE" \ + "ARG_ACTION_CONTAINER_START" \ "ARG_ACTION_CONTAINER_STOP" \ "ARG_ACTION_CONTAINER_DELETE" \ "ARG_ACTION_PROJECT_ASSEMBLE" \ @@ -175,6 +177,14 @@ elif [[ "$ACTION" == "$ARG_ACTION_CONTAINER_RUN" ]]; then docker run -t -d --name "$CONTAINER_INSTANCE_NAME" -v "${WORKSPACE}:/m3d/workspace/${PROJECT_NAME}" -p 5005:5005 "$CONTAINER_IMAGE_NAME" fi +# start the container +elif [[ "$ACTION" == "$ARG_ACTION_CONTAINER_START" ]]; then + echo "Starting dev environment ..." + validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}" + + echo "Starting the container ..." + (docker start "$CONTAINER_INSTANCE_NAME" 1>/dev/null && echo "The container was started") + # cleanup files generated by SBT elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_CLEAN" ]]; then echo "Deleting files generated by SBT ..." @@ -221,12 +231,12 @@ elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_TEST" ]]; then validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}" if [[ -z "$DEBUG" ]]; then - SBT_OPTS="-Xms512M -Xmx512M" + SBT_OPTS="-Xms512M -Xmx512M -XX:+UseG1GC" SBT_CMD="SBT_OPTS=\"${SBT_OPTS}\" sbt \"test:testOnly ${TEST_FILTER}\"" exec_command_within_container "$CONTAINER_INSTANCE_NAME" "$PROJECT_NAME" "$SBT_CMD" "$INTERACTIVE" else echo "Debugging is enabled" - SBT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms512M -Xmx512M" + SBT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms512M -Xmx512M -XX:+UseG1GC" SBT_CMD="SBT_OPTS=\"${SBT_OPTS}\" sbt \";set Test / fork := false; test:testOnly ${TEST_FILTER}\"" exec_command_within_container "$CONTAINER_INSTANCE_NAME" "$PROJECT_NAME" "$SBT_CMD" 1 fi diff --git a/project/build.properties b/project/build.properties index 302b6be..52a0b0e 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.3.13 \ No newline at end of file +sbt.version = 1.5.0 \ No newline at end of file diff --git a/src/main/scala/com/adidas/analytics/AlgorithmFactory.scala b/src/main/scala/com/adidas/analytics/AlgorithmFactory.scala index e29e1ec..9078ff0 100644 --- a/src/main/scala/com/adidas/analytics/AlgorithmFactory.scala +++ b/src/main/scala/com/adidas/analytics/AlgorithmFactory.scala @@ -44,9 +44,13 @@ object AlgorithmFactory { .config("hive.stats.fetch.column.stats", "true") .config("hive.stats.fetch.partition.stats", "true") .config("spark.sql.parquet.compression.codec", "snappy") - .config("spark.sql.parquet.writeLegacyFormat", "true") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .config("spark.sql.csv.parser.columnPruning.enabled", "false") + /* We maintain this option to ensure that Hive and Spark < 3.0 downstream consumers can + * properly read the Parquet data with appropriate data types and date formats. */ + .config("spark.sql.parquet.writeLegacyFormat", "true") + // Configs for delta lake load... more specifics added in the DeltaLakeLoad config + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .enableHiveSupport() .getOrCreate() diff --git a/src/main/scala/com/adidas/analytics/algo/GzipDecompressor.scala b/src/main/scala/com/adidas/analytics/algo/GzipDecompressor.scala index aff96e4..cdc0067 100644 --- a/src/main/scala/com/adidas/analytics/algo/GzipDecompressor.scala +++ b/src/main/scala/com/adidas/analytics/algo/GzipDecompressor.scala @@ -45,7 +45,7 @@ final class GzipDecompressor protected ( Future.sequence( fileSystem .ls(inputDirectoryPath, recursive) - .filterNot(path => fileSystem.isDirectory(path)) + .filterNot(path => fileSystem.getFileStatus(path).isDirectory) .map { compressedFilePath => Future { val decompressedFilePath = @@ -54,9 +54,8 @@ final class GzipDecompressor protected ( if (compressedFilePath.getName.endsWith(".zip")) { val zin = new ZipInputStream(fileSystem.open(compressedFilePath)) /* Warning: we intentionally only support zip files with one entry here as we want - * to control */ - /* the output name and can not merge multiple entries because they may have - * headers. */ + * to control the output name and can not merge multiple entries because they may + * have headers. */ zin.getNextEntry zin } else { diff --git a/src/main/scala/com/adidas/analytics/algo/loads/AppendLoad.scala b/src/main/scala/com/adidas/analytics/algo/loads/AppendLoad.scala index b542e48..e24c9a1 100644 --- a/src/main/scala/com/adidas/analytics/algo/loads/AppendLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/loads/AppendLoad.scala @@ -14,6 +14,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} + import scala.collection.immutable /** Performs append load of new records to an existing table. @@ -27,7 +28,18 @@ final class AppendLoad protected ( with TableStatistics with DateComponentDerivation { - override protected def read(): Vector[DataFrame] = readInputData(targetSchema, spark, dfs) + override protected def read(): Vector[DataFrame] = + if (regexFilename.nonEmpty || partitionSourceColumn.nonEmpty) + readInputData(targetSchema, spark, dfs) + else + Vector( + readInputFiles( + Seq(inputDir), + fileFormat, + if (!verifySchema) Some(targetSchema) else None, + spark.read.options(sparkReaderOptions) + ) + ) override protected def transform(dataFrames: Vector[DataFrame]): Vector[DataFrame] = if (partitionSourceColumn.nonEmpty) @@ -80,7 +92,7 @@ final class AppendLoad protected ( ): Seq[Source] = { logger.info(s"Looking for input files in $inputDirPath") val targetSchemaWithoutTargetPartitions = - getSchemaWithouttargetPartitions(targetSchema, targetPartitions.toSet) + getSchemaWithoutTargetPartitions(targetSchema, targetPartitions.toSet) if (partitionSourceColumn.nonEmpty) fs.ls(inputDirPath, recursive = true) .map(sourcePath => Source(targetSchemaWithoutTargetPartitions, sourcePath.toString)) @@ -169,19 +181,19 @@ final class AppendLoad protected ( private def readSources(sources: Seq[Source], spark: SparkSession): Vector[DataFrame] = groupSourcesBySchema(sources).map { case (schema, inputPaths) => - readInputFiles(inputPaths, fileFormat, schema, spark.read.options(sparkReaderOptions)) + readInputFiles(inputPaths, fileFormat, Some(schema), spark.read.options(sparkReaderOptions)) }.toVector private def readInputFiles( inputPaths: Seq[String], fileFormat: String, - schema: StructType, + schema: Option[StructType], reader: DataFrameReader ): DataFrame = fileFormat match { - case "dsv" => DSVFormat(Some(schema)).read(reader, inputPaths: _*) - case "parquet" => ParquetFormat(Some(schema)).read(reader, inputPaths: _*) - case "json" => JSONFormat(Some(schema)).read(reader, inputPaths: _*) + case "dsv" => DSVFormat(schema, multiLine = isMultiline).read(reader, inputPaths: _*) + case "parquet" => ParquetFormat(schema).read(reader, inputPaths: _*) + case "json" => JSONFormat(schema, multiLine = isMultiline).read(reader, inputPaths: _*) case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat") } } @@ -197,7 +209,7 @@ object AppendLoad { private def extractPathWithoutServerAndProtocol(path: String): String = path.replaceFirst("\\w+\\d*://.+?/", "") - private def getSchemaWithouttargetPartitions( + private def getSchemaWithoutTargetPartitions( targetSchema: StructType, targetPartitions: Set[String] ): StructType = @@ -263,7 +275,7 @@ object AppendLoad { val fs = dfs.getFileSystem(headerDirPath) dataFrames.foreach { df => val schemaJson = - getSchemaWithouttargetPartitions(df.schema, targetPartitions.toSet).prettyJson + getSchemaWithoutTargetPartitions(df.schema, targetPartitions.toSet).prettyJson df.collectPartitions(targetPartitions).foreach { partitionCriteria => val subdirectories = DataFrameUtils.mapPartitionsToDirectories(partitionCriteria) val headerPath = new Path(headerDirPath.join(subdirectories), headerFileName) diff --git a/src/main/scala/com/adidas/analytics/algo/loads/DeltaLakeLoad.scala b/src/main/scala/com/adidas/analytics/algo/loads/DeltaLakeLoad.scala index dc92514..e638450 100644 --- a/src/main/scala/com/adidas/analytics/algo/loads/DeltaLakeLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/loads/DeltaLakeLoad.scala @@ -28,7 +28,7 @@ final class DeltaLakeLoad protected ( case "parquet" => ParquetFormat() case "dsv" => DSVFormat() case "json" => - JSONFormat(multiLine = isMultilineJSON.getOrElse(false), optionalSchema = readJsonSchema) + JSONFormat(multiLine = isMultiline, optionalSchema = readJsonSchema) case _ => throw new RuntimeException(s"Unsupported input data format $fileFormat.") } @@ -81,12 +81,14 @@ final class DeltaLakeLoad protected ( deltaTableDF } else { - val condensedNewDataDF = condenseNewData(newDataDF, initLoad = true) - - affectedPartitions = condensedNewDataDF.collectPartitions(targetPartitions) - - initDeltaTable(condensedNewDataDF) + val initDataDF = + if (initCondensation) + condenseNewData(newDataDF, initLoad = true) + else + newDataDF + affectedPartitions = initDataDF.collectPartitions(targetPartitions) + initDeltaTable(initDataDF) DeltaTable.forPath(deltaTableDir).toDF } @@ -251,7 +253,10 @@ final class DeltaLakeLoad protected ( /** Given a collection of column names it builds the adequate delta merge match condition. E.g.: * Given Seq("sales_id", "sales_date") it returns currentData.sales_id = newData.sales_id AND - * currentData.sales_date = newData.sales_date + * currentData.sales_date = newData.sales_date. If affectedPartitionsMerge is true, then the + * merge condition will include the columns of the business key plus a filter with the affected + * partitions, otherwise it will include the columns of the business key plus the partition + * columns. * * @param columns * list of column names to build the match condition @@ -264,9 +269,15 @@ final class DeltaLakeLoad protected ( targetPartitions match { case _ :: _ => - if (!ignoreAffectedPartitionsMerge) generateCondition(businessKey.union(targetPartitions)) - else - s"${generateCondition(businessKey)} $logicalOperator (${generateAffectedPartitionsWhere(forceAdditionOfNullPartitionCriteria(affectedPartitions, targetPartitions), s"$currentDataAlias.")})" + if (!affectedPartitionsMerge) + generateCondition(businessKey.union(targetPartitions)) + else { + val affectedPartitionsWhere = generateAffectedPartitionsWhere( + forceAdditionOfNullPartitionCriteria(affectedPartitions, targetPartitions), + s"$currentDataAlias." + ) + s"${generateCondition(businessKey)} $logicalOperator ($affectedPartitionsWhere)" + } case _ => generateCondition(businessKey) } } diff --git a/src/main/scala/com/adidas/analytics/algo/loads/FullLoad.scala b/src/main/scala/com/adidas/analytics/algo/loads/FullLoad.scala index da00279..47e410e 100644 --- a/src/main/scala/com/adidas/analytics/algo/loads/FullLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/loads/FullLoad.scala @@ -25,9 +25,9 @@ final class FullLoad protected ( try { val dataFormat: DataFormat = fileFormat match { case "parquet" => ParquetFormat(Some(targetSchema)) - case "dsv" => DSVFormat(Some(targetSchema)) + case "dsv" => DSVFormat(Some(targetSchema), isMultiline) case "json" => - JSONFormat(multiLine = isMultilineJSON.getOrElse(false), optionalSchema = readJsonSchema) + JSONFormat(multiLine = isMultiline, optionalSchema = readJsonSchema) case _ => throw new RuntimeException(s"Unsupported input data format $fileFormat.") } Vector(dataFormat.read(spark.read.options(sparkReaderOptions), inputDir)) diff --git a/src/main/scala/com/adidas/analytics/algo/shared/CustomDateFormatters.scala b/src/main/scala/com/adidas/analytics/algo/shared/CustomDateFormatters.scala index b107556..e5402ae 100644 --- a/src/main/scala/com/adidas/analytics/algo/shared/CustomDateFormatters.scala +++ b/src/main/scala/com/adidas/analytics/algo/shared/CustomDateFormatters.scala @@ -32,4 +32,19 @@ object CustomDateFormatters { .appendLiteral("/") .appendValue(ChronoField.YEAR, 4) .toFormatter + + val YEAR_MONTH_DAY_WITH_TIME: DateTimeFormatter = + new DateTimeFormatterBuilder() + .appendValue(ChronoField.YEAR, 4) + .appendLiteral("-") + .appendValue(ChronoField.MONTH_OF_YEAR, 2) + .appendLiteral("-") + .appendValue(ChronoField.DAY_OF_MONTH, 2) + .appendLiteral(" ") + .appendValue(ChronoField.HOUR_OF_DAY, 2) + .appendLiteral(":") + .appendValue(ChronoField.MINUTE_OF_HOUR, 2) + .appendLiteral(":") + .appendValue(ChronoField.SECOND_OF_MINUTE, 2) + .toFormatter } diff --git a/src/main/scala/com/adidas/analytics/algo/shared/DataReshapingTask.scala b/src/main/scala/com/adidas/analytics/algo/shared/DataReshapingTask.scala index 62554fb..59fa49b 100644 --- a/src/main/scala/com/adidas/analytics/algo/shared/DataReshapingTask.scala +++ b/src/main/scala/com/adidas/analytics/algo/shared/DataReshapingTask.scala @@ -56,7 +56,7 @@ trait DataReshapingTask extends DataReshapingTaskConfig with DateComponentDeriva partitionSourceColumnFormat: String, targetPartitions: Seq[String] )(df: DataFrame): DataFrame = - if (targetPartitions.nonEmpty) + if (partitionSourceColumn.nonEmpty && targetPartitions.nonEmpty) df.transform( withDateComponents(partitionSourceColumn, partitionSourceColumnFormat, targetPartitions) ) diff --git a/src/main/scala/com/adidas/analytics/algo/shared/DateComponentDerivation.scala b/src/main/scala/com/adidas/analytics/algo/shared/DateComponentDerivation.scala index 3e8a67f..bf3004b 100644 --- a/src/main/scala/com/adidas/analytics/algo/shared/DateComponentDerivation.scala +++ b/src/main/scala/com/adidas/analytics/algo/shared/DateComponentDerivation.scala @@ -5,8 +5,9 @@ import java.time.format.DateTimeFormatter import java.time.temporal.ChronoField import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions.UserDefinedFunction -import org.apache.spark.sql.functions.{udf, _} -import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StringType + import scala.util.{Failure, Success, Try} trait DateComponentDerivation { @@ -28,7 +29,6 @@ trait DateComponentDerivation { df, sourceDateColumnName, colName, - DateComponentDerivation.DEFAULT_4DIGIT_VALUE, customYear ) case "month" => @@ -36,7 +36,6 @@ trait DateComponentDerivation { df, sourceDateColumnName, colName, - DateComponentDerivation.DEFAULT_2DIGIT_VALUE, customMonth ) case "day" => @@ -44,7 +43,6 @@ trait DateComponentDerivation { df, sourceDateColumnName, colName, - DateComponentDerivation.DEFAULT_2DIGIT_VALUE, customDay ) case "week" => @@ -52,7 +50,6 @@ trait DateComponentDerivation { df, sourceDateColumnName, colName, - DateComponentDerivation.DEFAULT_2DIGIT_VALUE, customWeekOfYear ) } @@ -68,92 +65,78 @@ trait DateComponentDerivation { inputDf: DataFrame, sourceDateColumnName: String, targetColumnName: String, - defaultValue: Int, derivationFunction: UserDefinedFunction ): DataFrame = inputDf.withColumn( targetColumnName, - when( - derivationFunction( - col(sourceDateColumnName).cast(StringType), - col(tempFormatterColumnName) - ).isNotNull, - derivationFunction(col(sourceDateColumnName).cast(StringType), col(tempFormatterColumnName)) - ).otherwise(lit(defaultValue)) + derivationFunction(col(sourceDateColumnName).cast(StringType), col(tempFormatterColumnName)) ) - private val customWeekOfYear = udf( - (ts: String, formatter: String) => - Try { - getCustomFormatter(formatter) match { - case Some(customFormatter) => - LocalDate - .parse(ts, customFormatter) - .get(ChronoField.ALIGNED_WEEK_OF_YEAR) - case None => - LocalDate - .parse(ts, DateTimeFormatter.ofPattern(formatter)) - .get(ChronoField.ALIGNED_WEEK_OF_YEAR) - } - } match { - case Failure(_) => None - case Success(value) => value - }, - IntegerType - ) + private val customWeekOfYear: UserDefinedFunction = udf { (ts: String, formatter: String) => + Try { + getCustomFormatter(formatter) match { + case Some(customFormatter) => + LocalDate + .parse(ts, customFormatter) + .get(ChronoField.ALIGNED_WEEK_OF_YEAR) + case None => + LocalDate + .parse(ts, DateTimeFormatter.ofPattern(formatter)) + .get(ChronoField.ALIGNED_WEEK_OF_YEAR) + } + } match { + case Failure(_) => DateComponentDerivation.DEFAULT_2DIGIT_VALUE + case Success(value) => value + } + } - private val customYear = udf( - (ts: String, formatter: String) => - Try { - getCustomFormatter(formatter) match { - case Some(customFormatter) => LocalDate.parse(ts, customFormatter).get(ChronoField.YEAR) - case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getYear - } - } match { - case Failure(_) => None - case Success(value) => value - }, - IntegerType + private val customYear = udf((ts: String, formatter: String) => + Try { + getCustomFormatter(formatter) match { + case Some(customFormatter) => LocalDate.parse(ts, customFormatter).get(ChronoField.YEAR) + case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getYear + } + } match { + case Failure(_) => DateComponentDerivation.DEFAULT_4DIGIT_VALUE + case Success(value) => value + } ) - private val customDay = udf( - (ts: String, formatter: String) => - Try { - getCustomFormatter(formatter) match { - case Some(customFormatter) => - val day_type = - if (formatter.contains("dd")) ChronoField.DAY_OF_MONTH else ChronoField.DAY_OF_WEEK - LocalDate.parse(ts, customFormatter).get(day_type) - case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getDayOfMonth - } - } match { - case Failure(_) => None - case Success(value) => value - }, - IntegerType + private val customDay = udf((ts: String, formatter: String) => + Try { + getCustomFormatter(formatter) match { + case Some(customFormatter) => + val dayType = + if (formatter.contains("dd")) ChronoField.DAY_OF_MONTH else ChronoField.DAY_OF_WEEK + LocalDate.parse(ts, customFormatter).get(dayType) + case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getDayOfMonth + } + } match { + case Failure(_) => DateComponentDerivation.DEFAULT_2DIGIT_VALUE + case Success(value) => value + } ) - private val customMonth = udf( - (ts: String, formatter: String) => - Try { - getCustomFormatter(formatter) match { - case Some(customFormatter) => LocalDate.parse(ts, customFormatter).getMonthValue - case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getMonthValue - } - } match { - case Failure(_) => None - case Success(value) => value - }, - IntegerType + private val customMonth = udf((ts: String, formatter: String) => + Try { + getCustomFormatter(formatter) match { + case Some(customFormatter) => LocalDate.parse(ts, customFormatter).getMonthValue + case None => LocalDate.parse(ts, DateTimeFormatter.ofPattern(formatter)).getMonthValue + } + } match { + case Failure(_) => DateComponentDerivation.DEFAULT_2DIGIT_VALUE + case Success(value) => value + } ) private def getCustomFormatter(dateFormatter: String): Option[DateTimeFormatter] = dateFormatter match { - case "yyyyww" => Option(CustomDateFormatters.YEAR_WEEK) - case "yyyywwe" => Option(CustomDateFormatters.YEAR_WEEK_DAY) - case "yyyyMM" => Option(CustomDateFormatters.YEAR_MONTH) - case "MM/dd/yyyy" => Option(CustomDateFormatters.MONTH_DAY_YEAR) - case _ => None + case "yyyyww" => Option(CustomDateFormatters.YEAR_WEEK) + case "yyyywwe" => Option(CustomDateFormatters.YEAR_WEEK_DAY) + case "yyyyMM" => Option(CustomDateFormatters.YEAR_MONTH) + case "MM/dd/yyyy" => Option(CustomDateFormatters.MONTH_DAY_YEAR) + case "yyyy-MM-dd HH:mm:ss" => Option(CustomDateFormatters.YEAR_MONTH_DAY_WITH_TIME) + case _ => None } } diff --git a/src/main/scala/com/adidas/analytics/config/loads/AppendLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/loads/AppendLoadConfiguration.scala index 58bb288..f49db91 100644 --- a/src/main/scala/com/adidas/analytics/config/loads/AppendLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/loads/AppendLoadConfiguration.scala @@ -7,6 +7,7 @@ import com.adidas.analytics.util.{CatalogTableManager, LoadMode, OutputWriter} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util.DropMalformedMode import org.apache.spark.sql.types.{DataType, StructType} + import scala.util.parsing.json.JSONObject trait AppendLoadConfiguration @@ -49,13 +50,22 @@ trait AppendLoadConfiguration private val targetDir: Option[String] = configReader.getAsOption[String]("target_dir") + private val writeLoadMode: LoadMode = + configReader.getAsOption[String]("write_load_mode") match { + case Some("AppendUnionPartitions") => LoadMode.AppendUnionPartitions + case None => LoadMode.OverwritePartitions + } + + override protected val outputFilesNum: Option[Int] = + configReader.getAsOption[Int]("output_files_num") + override protected val writer: OutputWriter.AtomicWriter = dataType match { case STRUCTURED if targetTable.isDefined => OutputWriter.newTableLocationWriter( table = targetTable.get, format = ParquetFormat(Some(targetSchema)), targetPartitions = targetPartitions, - loadMode = LoadMode.OverwritePartitionsWithAddedColumns, + loadMode = writeLoadMode, metadataConfiguration = getMetaDataUpdateStrategy(targetTable.get, targetPartitions) ) case SEMISTRUCTURED if targetDir.isDefined => @@ -63,7 +73,7 @@ trait AppendLoadConfiguration location = targetDir.get, format = ParquetFormat(Some(targetSchema)), targetPartitions = targetPartitions, - loadMode = LoadMode.OverwritePartitions + loadMode = writeLoadMode ) case anotherDataType => throw new RuntimeException( diff --git a/src/main/scala/com/adidas/analytics/config/loads/DeltaLakeLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/loads/DeltaLakeLoadConfiguration.scala index 7aeab40..6ba9aa7 100644 --- a/src/main/scala/com/adidas/analytics/config/loads/DeltaLakeLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/loads/DeltaLakeLoadConfiguration.scala @@ -64,16 +64,26 @@ trait DeltaLakeLoadConfiguration recordsToCondense.contains(row.getAs[String](recordModeColumnName)) } + protected val initCondensation: Boolean = + configReader.getAsOption[Boolean]("init_condensation").getOrElse(true) + protected val initCondensationWithRecordMode: Boolean = configReader.getAsOption[Boolean]("init_condensation_with_record_mode").getOrElse(true) - /* ignoreAffectedPartitionsMerge: covers the case where the partition key of the lake table can't - * be used to control the merge match conditions, because it is not constant per record, meaning - * it can change with time with the rest of the attributes of a record. It instructs the delta - * lake load algorithm to ignore the delta table partitions when merging the current with new - * data. */ - protected val ignoreAffectedPartitionsMerge: Boolean = - configReader.getAsOption[Boolean]("ignore_affected_partitions_merge").getOrElse(true) + /* affectedPartitionsMerge: covers the case where the partition key of the lake table can't be + * used to control the merge match conditions, because it is not constant per record, meaning it + * can change with time with the rest of the attributes of a record. It instructs the delta lake + * load algorithm to consider, or not, the affected delta table partitions when merging the + * current with new data. + * Without AffectedPartitionsMerge: + * new.bk1 = hist.bk1 and new.bk2 = hist.bk2 and new.pc1 = hist.pc1 and new.pc2 = hist.pc2 With + * AffectedPartitionsMerge: + * new.bk1 = hist.bk1 and new.bk2 = hist.bk2 and (hist.pc1= 2018 and hist.pc2 = 10) and (hist.pc1= + * 2019 and hist.pc2 = 10)... + * BK = Business Key; PC = Partition Column; New - New Data in the merge; Hist - Historical data + * in delta table involved in the merge. */ + protected val affectedPartitionsMerge: Boolean = + configReader.getAsOption[Boolean]("affected_partitions_merge").getOrElse(true) protected var affectedPartitions: Seq[PartitionCriteria] = _ @@ -113,9 +123,6 @@ trait DeltaLakeLoadConfiguration } // JSON Source properties - protected val isMultilineJSON: Option[Boolean] = - configReader.getAsOption[Boolean]("is_multiline_json") - protected val readJsonSchema: Option[StructType] = configReader.getAsOption[JSONObject]("schema") match { case Some(value) => Some(DataType.fromJson(value.toString()).asInstanceOf[StructType]) @@ -131,10 +138,13 @@ trait DeltaLakeLoadConfiguration "spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ) + spark.conf.set( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog" + ) spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") spark.conf.set("spark.delta.merge.repartitionBeforeWrite", "true") - spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") if (!isManualRepartitioning) spark.conf.set( "spark.sql.shuffle.partitions", @@ -150,8 +160,9 @@ trait DeltaLakeLoadConfiguration logger.info( s"Repartition before write is ${spark.conf.get("spark.delta.merge.repartitionBeforeWrite")}" ) - logger.info(s"Number of shuffle partitions: ${spark.conf.get("spark.sql.shuffle.partitions")}") - logger.info(s"Spark serializer: ${spark.conf.get("spark.serializer")}") + logger.info( + s"Number of shuffle partitions: ${spark.conf.getOption("spark.sql.shuffle.partitions")}" + ) } } diff --git a/src/main/scala/com/adidas/analytics/config/loads/FullLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/loads/FullLoadConfiguration.scala index 9753630..fae81ed 100644 --- a/src/main/scala/com/adidas/analytics/config/loads/FullLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/loads/FullLoadConfiguration.scala @@ -39,9 +39,6 @@ trait FullLoadConfiguration protected val nextTableLocationPrefix: String = nextTableLocation.substring(nextTableLocation.lastIndexOf('/')) - protected val isMultilineJSON: Option[Boolean] = - configReader.getAsOption[Boolean]("is_multiline_json") - protected val dropDateDerivedColumns: Boolean = configReader .getAsOption[Boolean]("drop_date_derived_columns") diff --git a/src/main/scala/com/adidas/analytics/config/loads/LoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/loads/LoadConfiguration.scala index 2e5f431..093733f 100644 --- a/src/main/scala/com/adidas/analytics/config/loads/LoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/loads/LoadConfiguration.scala @@ -17,6 +17,7 @@ trait LoadConfiguration { private val optionalSparkOptions: Map[String, String] = Map[String, Option[String]]( "nullValue" -> readNullValue, "quote" -> readQuoteValue, + "escape" -> readEscapeValue, "dateFormat" -> Some(dateFormat) ).collect { case (key, Some(value)) => (key, value) } @@ -39,6 +40,9 @@ trait LoadConfiguration { protected val sparkReaderOptions: Map[String, String] = requiredSparkOptions ++ optionalSparkOptions + protected val isMultiline: Boolean = + configReader.getAsOption[Boolean]("multi_line").getOrElse(false) + protected def configReader: ConfigReader protected def loadMode: String @@ -47,6 +51,9 @@ trait LoadConfiguration { protected def readQuoteValue: Option[String] = configReader.getAsOption[String]("quote_character") + protected def readEscapeValue: Option[String] = + configReader.getAsOption[String]("escape_character") + protected def computeTableStatistics: Boolean = configReader.getAsOption[Boolean]("compute_table_statistics").getOrElse(true) diff --git a/src/main/scala/com/adidas/analytics/config/templates/AlgorithmTemplateConfiguration.scala b/src/main/scala/com/adidas/analytics/config/templates/AlgorithmTemplateConfiguration.scala index db52128..be72abe 100644 --- a/src/main/scala/com/adidas/analytics/config/templates/AlgorithmTemplateConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/templates/AlgorithmTemplateConfiguration.scala @@ -50,7 +50,7 @@ trait AlgorithmTemplateConfiguration CatalogTableManager(targetTable, spark).getSchemaSafely(dfs) override protected val readers: Vector[InputReader.TableReader] = Vector( - // Obtaining a reader for the algorithm + // Obtaining a reader for the algorithm. InputReader.newTableReader(table = sourceTable) /* you can use a source location as parquet files on the lake instead of a hive table */ /* InputReader.newFileSystemReader(sourceLocation, DataFormat.ParquetFormat()) */ @@ -71,8 +71,8 @@ trait AlgorithmTemplateConfiguration OutputWriter.newTableLocationWriter( table = targetTable, format = ParquetFormat(Some(targetSchema)), - metadataConfiguration = getMetaDataUpdateStrategy(targetTable, Seq.empty), - targetPartitions = Seq.empty, + metadataConfiguration = getMetaDataUpdateStrategy(targetTable, Seq("", "", "")), + targetPartitions = Seq("", "", ""), /* If partitions are required, this would look like, e.g., Seq("year", "month") */ loadMode = LoadMode.OverwritePartitionsWithAddedColumns ) diff --git a/src/main/scala/com/adidas/analytics/util/DataFormat.scala b/src/main/scala/com/adidas/analytics/util/DataFormat.scala index b6971a9..143b48b 100644 --- a/src/main/scala/com/adidas/analytics/util/DataFormat.scala +++ b/src/main/scala/com/adidas/analytics/util/DataFormat.scala @@ -29,13 +29,15 @@ object DataFormat { } } - case class DSVFormat(optionalSchema: Option[StructType] = None) extends DataFormat { + case class DSVFormat(optionalSchema: Option[StructType] = None, multiLine: Boolean = false) + extends DataFormat { override def read(reader: DataFrameReader, locations: String*): DataFrame = { val filesString = locations.mkString(", ") logger.info(s"Reading DSV data from $filesString") optionalSchema .fold(reader.option("inferSchema", "true"))(schema => reader.schema(schema)) + .option("multiline", multiLine) .csv(locations: _*) } diff --git a/src/main/scala/com/adidas/analytics/util/DistCpWrapper.scala b/src/main/scala/com/adidas/analytics/util/DistCpWrapper.scala index 3bc2c7c..e1f0323 100644 --- a/src/main/scala/com/adidas/analytics/util/DistCpWrapper.scala +++ b/src/main/scala/com/adidas/analytics/util/DistCpWrapper.scala @@ -4,24 +4,22 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.tools.{DistCp, DistCpOptions} + import scala.collection.JavaConverters._ class DistCpWrapper(conf: Configuration, sources: Seq[Path], target: Path) { - private val baseOptions = new DistCpOptions(sources.asJava, target) - def run(mapsNum: Int = 10, atomic: Boolean = false, overwrite: Boolean = false): Job = { - val options = new DistCpOptions(baseOptions) - options.setAppend(false) - options.setBlocking(true) - options.setSyncFolder(false) - options.setDeleteMissing(false) - - options.setMaxMaps(mapsNum) - options.setOverwrite(overwrite) - options.setAtomicCommit(atomic) - - new DistCp(conf, options).execute() + val baseOptions = new DistCpOptions.Builder(sources.asJava, target) + .withAppend(false) + .withBlocking(true) + .withSyncFolder(false) + .withDeleteMissing(false) + .maxMaps(mapsNum) + .withOverwrite(overwrite) + .withAtomicCommit(atomic) + + new DistCp(conf, baseOptions.build()).execute() } } diff --git a/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_post.psv b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_post.psv new file mode 100644 index 0000000..35a0c20 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_post.psv @@ -0,0 +1,14 @@ +04/27/2020|04/29/2020|02/28/2020|04/28/2020|customer1|150|2020|17 +04/26/2020|04/29/2020|02/29/2020|04/29/2020|customer3|30|2020|17 +04/25/2020|03/29/2020|02/27/2020|03/30/2020|customer4|90|2020|17 +04/26/2020|02/29/2020|03/28/2020|04/28/2020|customer5|110|2020|17 +05/02/2020|01/29/2020|04/28/2020|02/28/2020|customer6|1550|2020|18 +05/03/2020|03/29/2020|05/28/2020|01/28/2020|customer7|10|2020|18 +05/04/2020|02/29/2020|03/28/2020|02/28/2020|customer8|120|2020|18 +05/05/2020|04/29/2020|03/28/2020|03/28/2020|customer9|330|2020|18 +05/07/2020|02/29/2020|01/28/2020|03/28/2020|customer10|250|2020|19 +05/08/2020|03/29/2020|02/28/2020|04/28/2020|customer11|140|2020|19 +05/15/2020|04/22/2020|03/22/2020|03/22/2020|customer2|10|2020|20 +05/16/2020|04/21/2020|03/21/2020|03/21/2020|customer21|110|2020|20 +05/17/2020|04/22/2020|03/12/2020|03/12/2020|customer211|1110|2020|20 +05/18/2020|04/28/2020|03/11/2020|03/11/2020|customer2111|11110|2020|20 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_pre.psv b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_pre.psv new file mode 100644 index 0000000..84bdc51 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/lake_data_pre.psv @@ -0,0 +1,10 @@ +04/27/2020|04/29/2020|02/28/2020|04/28/2020|customer1|150|2020|17 +04/26/2020|04/29/2020|02/29/2020|04/29/2020|customer3|30|2020|17 +04/25/2020|03/29/2020|02/27/2020|03/30/2020|customer4|90|2020|17 +04/26/2020|02/29/2020|03/28/2020|04/28/2020|customer5|110|2020|17 +05/02/2020|01/29/2020|04/28/2020|02/28/2020|customer6|1550|2020|18 +05/03/2020|03/29/2020|05/28/2020|01/28/2020|customer7|10|2020|18 +05/04/2020|02/29/2020|03/28/2020|02/28/2020|customer8|120|2020|18 +05/05/2020|04/29/2020|03/28/2020|03/28/2020|customer9|330|2020|18 +05/07/2020|02/29/2020|01/28/2020|03/28/2020|customer10|250|2020|19 +05/08/2020|03/29/2020|02/28/2020|04/28/2020|customer11|140|2020|19 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/new_data.psv new file mode 100644 index 0000000..ac413dc --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/new_data.psv @@ -0,0 +1,4 @@ +05/15/2020|04/22/2020|03/22/2020|03/22/2020|customer2|10 +05/16/2020|04/21/2020|03/21/2020|03/21/2020|customer21|110 +05/17/2020|04/22/2020|03/12/2020|03/12/2020|customer211|1110 +00/00/0000|00/00/0000|00/00/0000|00/00/0000|customer2111|11110 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/params.json b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/params.json new file mode 100644 index 0000000..e4c8b57 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/params.json @@ -0,0 +1,13 @@ +{ + "backup_dir": "/tmp/tests/test_lake/test_table/data_backup/", + "current_dir": "/tmp/tests/test_lake/test_table/data/", + "source_dir": "/tmp/tests/test_landing/test_table/data", + "header_dir": "/tmp/tests/test_landing/test_table/header", + "delimiter": "|", + "date_format": "MM/dd/yyyy", + "file_format": "dsv", + "has_header": false, + "partition_column": "report_date", + "target_partitions": ["year", "week"], + "target_table": "test_lake.test_table" +} \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/target_schema.json b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/target_schema.json new file mode 100644 index 0000000..c42e7fd --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/AppendLoadTestRes/partitioned_and_date_columns_exception/target_schema.json @@ -0,0 +1,45 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "report_date", + "type" : "date", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date1", + "type" : "date", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date2", + "type" : "date", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date3", + "type" : "date", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "integer", + "nullable" : true, + "metadata" : { } + } , { + "name" : "week", + "type" : "integer", + "nullable" : true, + "metadata" : { } + } + ] +} diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/init_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/init_data.psv index c598062..71e59f5 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/init_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/init_data.psv @@ -1,22 +1,22 @@ -actrequest_timestamp|datapakid|partno|record|salesorder|item|recordmode|date|customer|article|amount -00000000000000t|0|0|0|1|1|N||customer1|article1|100 -00000000000000t|0|0|0|1|1||20160601|customer1|article1|100 -00000000000000t|0|0|0|1|2|N|20160601|customer1|article2|200 -00000000000000t|0|0|0|1|3|N|20160601|customer1|article3|50 -00000000000000t|0|0|0|2|1|N|20170215|customer2|article4|10 -00000000000000t|0|0|0|2|2||20170215|customer2|article6|50 -00000000000000t|0|0|0|2|2|N||customer2|article6|50 -00000000000000t|0|0|0|2|3|N|20170215|customer2|article1|30 -00000000000000t|0|0|0|3|1|N|20170215|customer1|article5|200 -00000000000000t|0|0|0|3|2|N|20170215|customer1|article2|120 -00000000000000t|0|0|0|3|3|N|20170215|customer1|article4|90 -00000000000000t|0|0|0|4|1|N|20170430|customer3|article3|80 -00000000000000t|0|0|0|4|2|N|20170430|customer3|article7|70 -00000000000000t|0|0|0|4|3|N|20170430|customer3|article1|30 -00000000000000t|0|0|0|4|4|N|20170430|customer3|article2|50 -00000000000000t|0|0|0|5|1|N|20170510|customer4|article6|150 -00000000000000t|0|0|0|5|2|N|20170510|customer4|article3|100 -00000000000000t|0|0|0|5|3|N|20170510|customer4|article5|80 -00000000000000t|0|0|0|6|1|N|20170601|customer2|article4|100 -00000000000000t|0|0|0|6|2|N|20170601|customer2|article1|50 -00000000000000t|0|0|0|6|3|N|20170601|customer2|article2|90 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT +20180110100000t|00000000000000t|0|0|0|1|1|N||customer1|article1|100 +20180110100000t|00000000000000t|0|0|0|1|1||20160601|customer1|article1|100 +20180110100000t|00000000000000t|0|0|0|1|2|N|20160601|customer1|article2|200 +20180110100000t|00000000000000t|0|0|0|1|3|N|20160601|customer1|article3|50 +20180110100000t|00000000000000t|0|0|0|2|1|N|20170215|customer2|article4|10 +20180110100000t|00000000000000t|0|0|0|2|2||20170215|customer2|article6|50 +20180110100000t|00000000000000t|0|0|0|2|2|N||customer2|article6|50 +20180110100000t|00000000000000t|0|0|0|2|3|N|20170215|customer2|article1|30 +20180110100000t|00000000000000t|0|0|0|3|1|N|20170215|customer1|article5|200 +20180110100000t|00000000000000t|0|0|0|3|2|N|20170215|customer1|article2|120 +20180110100000t|00000000000000t|0|0|0|3|3|N|20170215|customer1|article4|90 +20180110100000t|00000000000000t|0|0|0|4|1|N|20170430|customer3|article3|80 +20180110100000t|00000000000000t|0|0|0|4|2|N|20170430|customer3|article7|70 +20180110100000t|00000000000000t|0|0|0|4|3|N|20170430|customer3|article1|30 +20180110100000t|00000000000000t|0|0|0|4|4|N|20170430|customer3|article2|50 +20180110100000t|00000000000000t|0|0|0|5|1|N|20170510|customer4|article6|150 +20180110100000t|00000000000000t|0|0|0|5|2|N|20170510|customer4|article3|100 +20180110100000t|00000000000000t|0|0|0|5|3|N|20170510|customer4|article5|80 +20180110100000t|00000000000000t|0|0|0|6|1|N|20170601|customer2|article4|100 +20180110100000t|00000000000000t|0|0|0|6|2|N|20170601|customer2|article1|50 +20180110100000t|00000000000000t|0|0|0|6|3|N|20170601|customer2|article2|90 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/new_data.psv index 35d4c04..8356c68 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/new_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/added_columns_and_duplicates_in_init/new_data.psv @@ -1,21 +1,21 @@ -ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT|DISCOUNT|UNINTERESTING_COLUMN -20180110120052t|request1|1|1|1|7|1|N|20180110|customer5|article2|120|0.0|10.0 -20180110120052t|request1|1|1|2|1|1|X|20160601|customer1|article1|100|10.0| -20180110120052t|request1|1|1|3|1|1||20160601|customer1|article1|150|10.0| -20180110120052t|request1|1|1|4|2|2|X|20170215|customer2|article6|50|10.0| -20180110120052t|request1|1|1|5|2|2||20170215|customer2|article2|50|10.0| -20180110120052t|request1|1|1|6|3|2|D|20170215|customer1|article2|120|10.0| -20180110120052t|request1|1|1|7|3|3|R|20170215|customer1|article4|-90|10.0| -20180110120052t|request1|1|1|8|4|1|X|20170430|customer3|article3|80|10.0| -20180110120052t|request1|1|1|9|4|1||20170430|customer3|article3|100|10.0| -20180110120052t|request1|1|1|10|4|2|X|20170430|customer3|article7|70|10.0| -20180110120052t|request1|1|1|11|4|2||20170430|customer3|article7|80|10.0| -20180110120052t|request1|1|1|12|4|3|D|20170430|customer3|article1|30|10.0| -20180110120052t|request1|1|1|13|4|4|X|20170430|customer3|article2|50|10.0| -20180110120052t|request1|1|1|14|4|4||20170430|customer3|article2|60|10.0| -20180110120052t|request1|2|1|1|4|4|X|20170430|customer3|article2|60|10.0| -20180110120052t|request1|2|1|2|4|4||20170430|customer3|article2|70|10.0| -20180110130103t|request2|1|1|3|4|1|X|20170430|customer3|article3|100|10.0| -20180110130103t|request2|1|1|4|4|1||20170430|customer3|article3|70|10.0| -20180110130103t|request2|1|1|5|4|2|D|20170430|customer3|article7|80|10.0| -20180110130103t|request2|1|1|6|4|3|N|20170430|customer3|article1|40|10.0| \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT|DISCOUNT|UNINTERESTING_COLUMN +20180111120052t|20180110120052t|request1|1|1|1|7|1|N|20180110|customer5|article2|120|0.0|10.0 +20180111120052t|20180110120052t|request1|1|1|2|1|1|X|20160601|customer1|article1|100|10.0| +20180111120052t|20180110120052t|request1|1|1|3|1|1||20160601|customer1|article1|150|10.0| +20180111120052t|20180110120052t|request1|1|1|4|2|2|X|20170215|customer2|article6|50|10.0| +20180111120052t|20180110120052t|request1|1|1|5|2|2||20170215|customer2|article2|50|10.0| +20180111120052t|20180110120052t|request1|1|1|6|3|2|D|20170215|customer1|article2|120|10.0| +20180111120052t|20180110120052t|request1|1|1|7|3|3|R|20170215|customer1|article4|-90|10.0| +20180111120052t|20180110120052t|request1|1|1|8|4|1|X|20170430|customer3|article3|80|10.0| +20180111120052t|20180110120052t|request1|1|1|9|4|1||20170430|customer3|article3|100|10.0| +20180111120052t|20180110120052t|request1|1|1|10|4|2|X|20170430|customer3|article7|70|10.0| +20180111120052t|20180110120052t|request1|1|1|11|4|2||20170430|customer3|article7|80|10.0| +20180111120052t|20180110120052t|request1|1|1|12|4|3|D|20170430|customer3|article1|30|10.0| +20180111120052t|20180110120052t|request1|1|1|13|4|4|X|20170430|customer3|article2|50|10.0| +20180111120052t|20180110120052t|request1|1|1|14|4|4||20170430|customer3|article2|60|10.0| +20180111120052t|20180110120052t|request1|2|1|1|4|4|X|20170430|customer3|article2|60|10.0| +20180111120052t|20180110120052t|request1|2|1|2|4|4||20170430|customer3|article2|70|10.0| +20180111120052t|20180110130103t|request2|1|1|3|4|1|X|20170430|customer3|article3|100|10.0| +20180111120052t|20180110130103t|request2|1|1|4|4|1||20170430|customer3|article3|70|10.0| +20180111120052t|20180110130103t|request2|1|1|5|4|2|D|20170430|customer3|article7|80|10.0| +20180111120052t|20180110130103t|request2|1|1|6|4|3|N|20170430|customer3|article1|40|10.0| \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/init_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/init_data.psv index 99b151d..26fd539 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/init_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/init_data.psv @@ -1,20 +1,20 @@ -ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT -00000000000000t|0|0|0|1|1||20160601|customer1|article1|100 -00000000000000t|0|0|0|1|2||20160601|customer1|article2|200 -00000000000000t|0|0|0|1|3||20160601|customer1|article3|50 -00000000000000t|0|0|0|2|1||20170215|customer2|article4|10 -00000000000000t|0|0|0|2|2||20170215|customer2|article6|50 -00000000000000t|0|0|0|2|3||20170215|customer2|article1|30 -00000000000000t|0|0|0|3|1||20170215|customer1|article5|200 -00000000000000t|0|0|0|3|2||20170215|customer1|article2|120 -00000000000000t|0|0|0|3|3||20170215|customer1|article4|90 -00000000000000t|0|0|0|4|1||20170430|customer3|article3|80 -00000000000000t|0|0|0|4|2||20170430|customer3|article7|70 -00000000000000t|0|0|0|4|3||20170430|customer3|article1|30 -00000000000000t|0|0|0|4|4||20170430|customer3|article2|50 -00000000000000t|0|0|0|5|1||20170510|customer4|article6|150 -00000000000000t|0|0|0|5|2||20170510|customer4|article3|100 -00000000000000t|0|0|0|5|3||20170510|customer4|article5|80 -00000000000000t|0|0|0|6|1||20170601|customer2|article4|100 -00000000000000t|0|0|0|6|2||20170601|customer2|article1|50 -00000000000000t|0|0|0|6|3||20170601|customer2|article2|90 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT +20180110100000t|00000000000000t|0|0|0|1|1||20160601|customer1|article1|100 +20180110100000t|00000000000000t|0|0|0|1|2||20160601|customer1|article2|200 +20180110100000t|00000000000000t|0|0|0|1|3||20160601|customer1|article3|50 +20180110100000t|00000000000000t|0|0|0|2|1||20170215|customer2|article4|10 +20180110100000t|00000000000000t|0|0|0|2|2||20170215|customer2|article6|50 +20180110100000t|00000000000000t|0|0|0|2|3||20170215|customer2|article1|30 +20180110100000t|00000000000000t|0|0|0|3|1||20170215|customer1|article5|200 +20180110100000t|00000000000000t|0|0|0|3|2||20170215|customer1|article2|120 +20180110100000t|00000000000000t|0|0|0|3|3||20170215|customer1|article4|90 +20180110100000t|00000000000000t|0|0|0|4|1||20170430|customer3|article3|80 +20180110100000t|00000000000000t|0|0|0|4|2||20170430|customer3|article7|70 +20180110100000t|00000000000000t|0|0|0|4|3||20170430|customer3|article1|30 +20180110100000t|00000000000000t|0|0|0|4|4||20170430|customer3|article2|50 +20180110100000t|00000000000000t|0|0|0|5|1||20170510|customer4|article6|150 +20180110100000t|00000000000000t|0|0|0|5|2||20170510|customer4|article3|100 +20180110100000t|00000000000000t|0|0|0|5|3||20170510|customer4|article5|80 +20180110100000t|00000000000000t|0|0|0|6|1||20170601|customer2|article4|100 +20180110100000t|00000000000000t|0|0|0|6|2||20170601|customer2|article1|50 +20180110100000t|00000000000000t|0|0|0|6|3||20170601|customer2|article2|90 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/new_data.psv index a8dc7b7..9e062f4 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/new_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/new_data.psv @@ -1,21 +1,21 @@ -ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT -20180110120052t|request1|1|1|1|7|1|N|20180110|customer5|article2|120 -20180110120052t|request1|1|1|2|1|1|X|20160601|customer1|article1|100 -20180110120052t|request1|1|1|3|1|1||20160601|customer1|article1|150 -20180110120052t|request1|1|1|4|2|2|X|20170215|customer2|article6|50 -20180110120052t|request1|1|1|5|2|2||20170215|customer2|article2|50 -20180110120052t|request1|1|1|6|3|2|D|20170215|customer1|article2|120 -20180110120052t|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 -20180110120052t|request1|1|1|8|4|1|X|20170430|customer3|article3|80 -20180110120052t|request1|1|1|9|4|1||20170430|customer3|article3|100 -20180110120052t|request1|1|1|10|4|2|X|20170430|customer3|article7|70 -20180110120052t|request1|1|1|11|4|2||20170430|customer3|article7|80 -20180110120052t|request1|1|1|12|4|3|D|20170430|customer3|article1|30 -20180110120052t|request1|1|1|13|4|4|X|20170430|customer3|article2|50 -20180110120052t|request1|1|1|14|4|4||20170430|customer3|article2|60 -20180110120052t|request1|2|1|1|4|4|X|20170430|customer3|article2|60 -20180110120052t|request1|2|1|2|4|4||20170430|customer3|article2|70 -20180110130103t|request2|1|1|3|4|1|X|20170430|customer3|article3|100 -20180110130103t|request2|1|1|4|4|1||20170430|customer3|article3|70 -20180110130103t|request2|1|1|5|4|2|D|20170430|customer3|article7|80 -20180110130103t|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT +20180111120052t|20180110120052t|request1|1|1|1|7|1|N|20180110|customer5|article2|120 +20180111120052t|20180110120052t|request1|1|1|2|1|1|X|20160601|customer1|article1|100 +20180111120052t|20180110120052t|request1|1|1|3|1|1||20160601|customer1|article1|150 +20180111120052t|20180110120052t|request1|1|1|4|2|2|X|20170215|customer2|article6|50 +20180111120052t|20180110120052t|request1|1|1|5|2|2||20170215|customer2|article2|50 +20180111120052t|20180110120052t|request1|1|1|6|3|2|D|20170215|customer1|article2|120 +20180111120052t|20180110120052t|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 +20180111120052t|20180110120052t|request1|1|1|8|4|1|X|20170430|customer3|article3|80 +20180111120052t|20180110120052t|request1|1|1|9|4|1||20170430|customer3|article3|100 +20180111120052t|20180110120052t|request1|1|1|10|4|2|X|20170430|customer3|article7|70 +20180111120052t|20180110120052t|request1|1|1|11|4|2||20170430|customer3|article7|80 +20180111120052t|20180110120052t|request1|1|1|12|4|3|D|20170430|customer3|article1|30 +20180111120052t|20180110120052t|request1|1|1|13|4|4|X|20170430|customer3|article2|50 +20180111120052t|20180110120052t|request1|1|1|14|4|4||20170430|customer3|article2|60 +20180111120052t|20180110120052t|request1|2|1|1|4|4|X|20170430|customer3|article2|60 +20180111120052t|20180110120052t|request1|2|1|2|4|4||20170430|customer3|article2|70 +20180111120052t|20180110130103t|request2|1|1|3|4|1|X|20170430|customer3|article3|100 +20180111120052t|20180110130103t|request2|1|1|4|4|1||20170430|customer3|article3|70 +20180111120052t|20180110130103t|request2|1|1|5|4|2|D|20170430|customer3|article7|80 +20180111120052t|20180110130103t|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/params.json b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/params.json index 3bdf4b7..16658f7 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/params.json +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/nonpartitioned/params.json @@ -11,6 +11,7 @@ "item" ], "condensation_key": [ + "m3d_timestamp", "actrequest_timestamp", "datapakid", "partno", diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/params.json b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/params.json index 6fc70bb..12bcf27 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/params.json +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/params.json @@ -11,6 +11,7 @@ "item" ], "condensation_key": [ + "m3d_timestamp", "actrequest_timestamp", "datapakid", "partno", diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/init_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/init_data.psv index 489bba4..ccb30d9 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/init_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/init_data.psv @@ -1,20 +1,20 @@ -ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT -00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 -00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 -00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 -00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 -00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 -00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 -00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 -00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 -00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 -00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 -00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 -00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 -00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 -00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 -00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 -00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 -00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 -00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 -00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT +20180110100000t|00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 +20180110100000t|00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 +20180110100000t|00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 +20180110100000t|00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 +20180110100000t|00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 +20180110100000t|00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 +20180110100000t|00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 +20180110100000t|00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 +20180110100000t|00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 +20180110100000t|00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 +20180110100000t|00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 +20180110100000t|00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 +20180110100000t|00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 +20180110100000t|00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 +20180110100000t|00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 +20180110100000t|00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 +20180110100000t|00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 +20180110100000t|00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 +20180110100000t|00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/new_data.psv index 6d9e5a5..0f78b03 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/new_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/removed_columns/new_data.psv @@ -1,21 +1,21 @@ -ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE -20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2 -20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1 -20180110120052|request1|1|1|3|1|1||20160601|customer1|article1 -20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6 -20180110120052|request1|1|1|5|2|2||20170215|customer2|article2 -20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2 -20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4 -20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3 -20180110120052|request1|1|1|9|4|1||20170430|customer3|article3 -20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7 -20180110120052|request1|1|1|11|4|2||20170430|customer3|article7 -20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 -20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2 -20180110120052|request1|1|1|14|4|4||20170430|customer3|article2 -20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2 -20180110120052|request1|2|1|2|4|4||20170430|customer3|article2 -20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3 -20180110130103|request2|1|1|4|4|1||20170430|customer3|article3 -20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7 -20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE +20180111120052t|20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2 +20180111120052t|20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1 +20180111120052t|20180110120052|request1|1|1|3|1|1||20160601|customer1|article1 +20180111120052t|20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6 +20180111120052t|20180110120052|request1|1|1|5|2|2||20170215|customer2|article2 +20180111120052t|20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2 +20180111120052t|20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4 +20180111120052t|20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3 +20180111120052t|20180110120052|request1|1|1|9|4|1||20170430|customer3|article3 +20180111120052t|20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7 +20180111120052t|20180110120052|request1|1|1|11|4|2||20170430|customer3|article7 +20180111120052t|20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 +20180111120052t|20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2 +20180111120052t|20180110120052|request1|1|1|14|4|4||20170430|customer3|article2 +20180111120052t|20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2 +20180111120052t|20180110120052|request1|2|1|2|4|4||20170430|customer3|article2 +20180111120052t|20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3 +20180111120052t|20180110130103|request2|1|1|4|4|1||20170430|customer3|article3 +20180111120052t|20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7 +20180111120052t|20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/init_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/init_data.psv index 489bba4..ccb30d9 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/init_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/init_data.psv @@ -1,20 +1,20 @@ -ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT -00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 -00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 -00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 -00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 -00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 -00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 -00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 -00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 -00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 -00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 -00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 -00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 -00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 -00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 -00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 -00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 -00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 -00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 -00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT +20180110100000t|00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 +20180110100000t|00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 +20180110100000t|00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 +20180110100000t|00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 +20180110100000t|00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 +20180110100000t|00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 +20180110100000t|00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 +20180110100000t|00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 +20180110100000t|00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 +20180110100000t|00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 +20180110100000t|00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 +20180110100000t|00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 +20180110100000t|00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 +20180110100000t|00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 +20180110100000t|00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 +20180110100000t|00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 +20180110100000t|00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 +20180110100000t|00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 +20180110100000t|00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/new_data.psv index b5e17f7..731061d 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/new_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/new_data.psv @@ -1,21 +1,21 @@ -ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT -20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2|120 -20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1|100 -20180110120052|request1|1|1|3|1|1||20160602|customer1|article1|150 -20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6|50 -20180110120052|request1|1|1|5|2|2||20170216|customer2|article2|50 -20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2|120 -20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 -20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3|80 -20180110120052|request1|1|1|9|4|1||20170430|customer3|article3|100 -20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7|70 -20180110120052|request1|1|1|11|4|2||20170430|customer3|article7|80 -20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 -20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2|50 -20180110120052|request1|1|1|14|4|4||20170430|customer3|article2|60 -20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2|60 -20180110120052|request1|2|1|2|4|4||20170430|customer3|article2|70 -20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3|100 -20180110130103|request2|1|1|4|4|1||20170430|customer3|article3|70 -20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7|80 -20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT +20180111120052t|20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2|120 +20180111120052t|20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1|100 +20180111120052t|20180110120052|request1|1|1|3|1|1||20160602|customer1|article1|150 +20180111120052t|20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6|50 +20180111120052t|20180110120052|request1|1|1|5|2|2||20170216|customer2|article2|50 +20180111120052t|20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2|120 +20180111120052t|20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 +20180111120052t|20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3|80 +20180111120052t|20180110120052|request1|1|1|9|4|1||20170430|customer3|article3|100 +20180111120052t|20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7|70 +20180111120052t|20180110120052|request1|1|1|11|4|2||20170430|customer3|article7|80 +20180111120052t|20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 +20180111120052t|20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2|50 +20180111120052t|20180110120052|request1|1|1|14|4|4||20170430|customer3|article2|60 +20180111120052t|20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2|60 +20180111120052t|20180110120052|request1|2|1|2|4|4||20170430|customer3|article2|70 +20180111120052t|20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3|100 +20180111120052t|20180110130103|request2|1|1|4|4|1||20170430|customer3|article3|70 +20180111120052t|20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7|80 +20180111120052t|20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/params.json b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/params.json index ad243b4..314ded8 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/params.json +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_right_params/params.json @@ -11,6 +11,7 @@ "item" ], "condensation_key": [ + "m3d_timestamp", "actrequest_timestamp", "datapakid", "partno", @@ -37,5 +38,5 @@ ], "partition_column": "date", "partition_column_format": "yyyyMMdd", - "ignore_affected_partitions_merge": true + "affected_partitions_merge": true } \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/init_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/init_data.psv index 489bba4..ccb30d9 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/init_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/init_data.psv @@ -1,20 +1,20 @@ -ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT -00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 -00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 -00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 -00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 -00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 -00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 -00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 -00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 -00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 -00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 -00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 -00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 -00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 -00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 -00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 -00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 -00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 -00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 -00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|DATE|CUSTOMER|ARTICLE|RECORDMODE|AMOUNT +20180110100000t|00000000000000t|0|0|0|1|1|20160601|customer1|article1||100 +20180110100000t|00000000000000t|0|0|0|1|2|20160601|customer1|article2||200 +20180110100000t|00000000000000t|0|0|0|1|3|20160601|customer1|article3||50 +20180110100000t|00000000000000t|0|0|0|2|1|20170215|customer2|article4||10 +20180110100000t|00000000000000t|0|0|0|2|2|20170215|customer2|article6||50 +20180110100000t|00000000000000t|0|0|0|2|3|20170215|customer2|article1||30 +20180110100000t|00000000000000t|0|0|0|3|1|20170215|customer1|article5||200 +20180110100000t|00000000000000t|0|0|0|3|2|20170215|customer1|article2||120 +20180110100000t|00000000000000t|0|0|0|3|3|20170215|customer1|article4||90 +20180110100000t|00000000000000t|0|0|0|4|1|20170430|customer3|article3||80 +20180110100000t|00000000000000t|0|0|0|4|2|20170430|customer3|article7||70 +20180110100000t|00000000000000t|0|0|0|4|3|20170430|customer3|article1||30 +20180110100000t|00000000000000t|0|0|0|4|4|20170430|customer3|article2||50 +20180110100000t|00000000000000t|0|0|0|5|1|20170510|customer4|article6||150 +20180110100000t|00000000000000t|0|0|0|5|2|20170510|customer4|article3||100 +20180110100000t|00000000000000t|0|0|0|5|3|20170510|customer4|article5||80 +20180110100000t|00000000000000t|0|0|0|6|1|20170601|customer2|article4||100 +20180110100000t|00000000000000t|0|0|0|6|2|20170601|customer2|article1||50 +20180110100000t|00000000000000t|0|0|0|6|3|20170601|customer2|article2||90 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/new_data.psv b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/new_data.psv index b5e17f7..731061d 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/new_data.psv +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/new_data.psv @@ -1,21 +1,21 @@ -ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT -20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2|120 -20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1|100 -20180110120052|request1|1|1|3|1|1||20160602|customer1|article1|150 -20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6|50 -20180110120052|request1|1|1|5|2|2||20170216|customer2|article2|50 -20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2|120 -20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 -20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3|80 -20180110120052|request1|1|1|9|4|1||20170430|customer3|article3|100 -20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7|70 -20180110120052|request1|1|1|11|4|2||20170430|customer3|article7|80 -20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 -20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2|50 -20180110120052|request1|1|1|14|4|4||20170430|customer3|article2|60 -20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2|60 -20180110120052|request1|2|1|2|4|4||20170430|customer3|article2|70 -20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3|100 -20180110130103|request2|1|1|4|4|1||20170430|customer3|article3|70 -20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7|80 -20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file +M3D_TIMESTAMP|ACTREQUEST_TIMESTAMP|REQUEST|DATAPAKID|PARTNO|RECORD|SALESORDER|ITEM|RECORDMODE|DATE|CUSTOMER|ARTICLE|AMOUNT +20180111120052t|20180110120052|request1|1|1|1|7|1|N|20180110|customer5|article2|120 +20180111120052t|20180110120052|request1|1|1|2|1|1|X|20160601|customer1|article1|100 +20180111120052t|20180110120052|request1|1|1|3|1|1||20160602|customer1|article1|150 +20180111120052t|20180110120052|request1|1|1|4|2|2|X|20170215|customer2|article6|50 +20180111120052t|20180110120052|request1|1|1|5|2|2||20170216|customer2|article2|50 +20180111120052t|20180110120052|request1|1|1|6|3|2|D|20170215|customer1|article2|120 +20180111120052t|20180110120052|request1|1|1|7|3|3|R|20170215|customer1|article4|-90 +20180111120052t|20180110120052|request1|1|1|8|4|1|X|20170430|customer3|article3|80 +20180111120052t|20180110120052|request1|1|1|9|4|1||20170430|customer3|article3|100 +20180111120052t|20180110120052|request1|1|1|10|4|2|X|20170430|customer3|article7|70 +20180111120052t|20180110120052|request1|1|1|11|4|2||20170430|customer3|article7|80 +20180111120052t|20180110120052|request1|1|1|12|4|3|D|20170430|customer3|article1|30 +20180111120052t|20180110120052|request1|1|1|13|4|4|X|20170430|customer3|article2|50 +20180111120052t|20180110120052|request1|1|1|14|4|4||20170430|customer3|article2|60 +20180111120052t|20180110120052|request1|2|1|1|4|4|X|20170430|customer3|article2|60 +20180111120052t|20180110120052|request1|2|1|2|4|4||20170430|customer3|article2|70 +20180111120052t|20180110130103|request2|1|1|3|4|1|X|20170430|customer3|article3|100 +20180111120052t|20180110130103|request2|1|1|4|4|1||20170430|customer3|article3|70 +20180111120052t|20180110130103|request2|1|1|5|4|2|D|20170430|customer3|article7|80 +20180111120052t|20180110130103|request2|1|1|6|4|3|N|20170430|customer3|article1|40 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/params.json b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/params.json index d70269d..0002395 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/params.json +++ b/src/test/resources/com/adidas/analytics/feature/loads/DeltaLakeLoadTestRes/unstable_partitions_wrong_params/params.json @@ -11,6 +11,7 @@ "item" ], "condensation_key": [ + "m3d_timestamp", "actrequest_timestamp", "datapakid", "partno", @@ -37,5 +38,5 @@ ], "partition_column": "date", "partition_column_format": "yyyyMMdd", - "ignore_affected_partitions_merge": false + "affected_partitions_merge": false } \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/landing/new_data_multi_partition_columns.psv b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/landing/new_data_multi_partition_columns.psv new file mode 100644 index 0000000..e6a6aa3 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/landing/new_data_multi_partition_columns.psv @@ -0,0 +1,25 @@ +1|1||article1|150|customer1|20160601 +1|2||article2|200|customer1|20160601 +1|3||article3|50|customer1|20160601 +2|1||article4|10|customer2|20170215 +2|2||article5|50|customer2|20170215 +2|3||article1|30|customer2|20170215 +3|1||article6|200|customer1|20170215 +3|2||article2|120|customer1|20170215 +3|3||article4|90|customer1|20170215 +4|1||article3|80|customer3|20170430 +4|2||article7|70|customer3|20170430 +4|3||article1|30|customer3|20170430 +4|4||article2|50|customer3|20170430 +5|1||article5|150|customer4|20170510 +5|2||article3|100|customer4|20170510 +5|3||article6|80|customer4|20170510 +6|1||article4|100|customer2|20180601 +6|2||article1|50|customer2|20180601 +6|3||article2|90|customer2|20180601 +7|1|N|article2|120|customer5|20180110 +7|2|N|article4|180|customer5|20180110 +7|3|N|article1|220|customer5|20180110 +8|1|N|article2|200|customer5|20180110 +8|2|N|article4|80|customer5|20180110 +8|3|N|article1|20|customer5|20180110 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/nested_flattener/params_transpose_scenario.json b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/nested_flattener/params_transpose_scenario.json index 12d4e43..b8c8861 100644 --- a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/nested_flattener/params_transpose_scenario.json +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/nested_flattener/params_transpose_scenario.json @@ -10,7 +10,7 @@ "target_partitions": [], "source_dir": "/tmp/tests/test_landing/test/test_table/data", "target_table": "test_lake.test_table", - "is_multiline_json": true, + "multi_line": true, "additional_task": { "nested_task_properties" : { "side_flatten": { diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_post.psv b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_post.psv new file mode 100644 index 0000000..e6a6aa3 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_post.psv @@ -0,0 +1,25 @@ +1|1||article1|150|customer1|20160601 +1|2||article2|200|customer1|20160601 +1|3||article3|50|customer1|20160601 +2|1||article4|10|customer2|20170215 +2|2||article5|50|customer2|20170215 +2|3||article1|30|customer2|20170215 +3|1||article6|200|customer1|20170215 +3|2||article2|120|customer1|20170215 +3|3||article4|90|customer1|20170215 +4|1||article3|80|customer3|20170430 +4|2||article7|70|customer3|20170430 +4|3||article1|30|customer3|20170430 +4|4||article2|50|customer3|20170430 +5|1||article5|150|customer4|20170510 +5|2||article3|100|customer4|20170510 +5|3||article6|80|customer4|20170510 +6|1||article4|100|customer2|20180601 +6|2||article1|50|customer2|20180601 +6|3||article2|90|customer2|20180601 +7|1|N|article2|120|customer5|20180110 +7|2|N|article4|180|customer5|20180110 +7|3|N|article1|220|customer5|20180110 +8|1|N|article2|200|customer5|20180110 +8|2|N|article4|80|customer5|20180110 +8|3|N|article1|20|customer5|20180110 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_pre.psv b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_pre.psv new file mode 100644 index 0000000..d3acd6c --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/lake_data_pre.psv @@ -0,0 +1,19 @@ +1|1||article1|150|customer1|20160601 +1|2||article2|200|customer1|20160601 +1|3||article3|50|customer1|20160601 +2|1||article4|10|customer2|20170215 +2|2||article5|50|customer2|20170215 +2|3||article1|30|customer2|20170215 +3|1||article6|200|customer1|20170215 +3|2||article2|120|customer1|20170215 +3|3||article4|90|customer1|20170215 +4|1||article3|80|customer3|20170430 +4|2||article7|70|customer3|20170430 +4|3||article1|30|customer3|20170430 +4|4||article2|50|customer3|20170430 +5|1||article5|150|customer4|20170510 +5|2||article3|100|customer4|20170510 +5|3||article6|80|customer4|20170510 +6|1||article4|100|customer2|20180601 +6|2||article1|50|customer2|20180601 +6|3||article2|90|customer2|20180601 \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/params.json b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/params.json new file mode 100644 index 0000000..35b7261 --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/params.json @@ -0,0 +1,16 @@ +{ + "base_data_dir": "data/", + "backup_dir": "/tmp/tests/test_lake/test/test_table/data_backup/", + "current_dir": "/tmp/tests/test_lake/test/test_table/data/", + "delimiter": "|", + "file_format": "dsv", + "has_header": false, + "partition_column": "", + "partition_column_format": "", + "target_partitions": [ + "customer", + "date" + ], + "source_dir": "/tmp/tests/test_landing/test/test_table/data", + "target_table": "test_lake.test_table" +} \ No newline at end of file diff --git a/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/target_schema.json b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/target_schema.json new file mode 100644 index 0000000..d98e1cd --- /dev/null +++ b/src/test/resources/com/adidas/analytics/feature/loads/FullLoadTestRes/partitioned_multi_columns/target_schema.json @@ -0,0 +1,47 @@ +{ + "type": "struct", + "fields": [ + { + "name": "salesorder", + "type": "integer", + "nullable": true, + "metadata": {} + }, + { + "name": "item", + "type": "integer", + "nullable": true, + "metadata": {} + }, + { + "name": "recordmode", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "article", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "amount", + "type": "integer", + "nullable": true, + "metadata": {} + }, + { + "name": "customer", + "type": "string", + "nullable": true, + "metadata": {} + }, + { + "name": "date", + "type": "integer", + "nullable": true, + "metadata": {} + } + ] +} diff --git a/src/test/scala/com/adidas/analytics/feature/loads/AppendLoadTest.scala b/src/test/scala/com/adidas/analytics/feature/loads/AppendLoadTest.scala index 8febe2f..530034b 100644 --- a/src/test/scala/com/adidas/analytics/feature/loads/AppendLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/feature/loads/AppendLoadTest.scala @@ -121,7 +121,7 @@ class AppendLoadTest extends AnyFeatureSpec with BaseAlgorithmTest { fs.exists(headerPath20180422) shouldBe false // executing load - assertThrows[SparkException](AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run()) + assertThrows[RuntimeException](AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run()) } Scenario( @@ -636,6 +636,36 @@ class AppendLoadTest extends AnyFeatureSpec with BaseAlgorithmTest { fs.exists(headerPath202020) shouldBe true fs.exists(targetPath202020) shouldBe true } + + Scenario( + "Using Append Load Algorithm to integrate date columns as date format gives exception when input has invalid dates" + ) { + val testResourceDir = "partitioned_and_date_columns_exception" + val headerPath202020 = new Path(headerDirPath, "year=2020/week=20/header.json") + val targetPath202020 = new Path(targetDirPath, "year=2020/week=20") + + val targetSchema = + DataType + .fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")) + .asInstanceOf[StructType] + val dataReader = FileReader.newDSVFileReader(Some(targetSchema), dateFormat = "MM/dd/yyyy") + + val targetTable = createTargetTable(Seq("year", "week"), targetSchema) + setupInitialState(targetTable, s"$testResourceDir/lake_data_pre.psv", dataReader) + prepareSourceData(testResourceDir, Seq("new_data.psv")) + uploadParameters(testResourceDir) + + // checking pre-conditions + spark.read.csv(sourceDirPath.toString).count() shouldBe 4 + targetTable.read().count() shouldBe 10 + + fs.exists(headerPath202020) shouldBe false + fs.exists(targetPath202020) shouldBe false + + // executing load + assertThrows[SparkException](AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run()) + } + } override def beforeEach(): Unit = { diff --git a/src/test/scala/com/adidas/analytics/feature/loads/DeltaLakeLoadTest.scala b/src/test/scala/com/adidas/analytics/feature/loads/DeltaLakeLoadTest.scala index 5c79cff..017aa74 100644 --- a/src/test/scala/com/adidas/analytics/feature/loads/DeltaLakeLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/feature/loads/DeltaLakeLoadTest.scala @@ -12,9 +12,6 @@ import org.scalatest.matchers.should.Matchers._ class DeltaLakeLoadTest extends AnyFeatureSpec with BaseAlgorithmTest with GivenWhenThen { - spark.conf.set("spark.executor.instances", "1") - spark.conf.set("spark.executor.cores", "2") - private val sourceSystem: String = "sales" private val testTableName: String = "orders" private val layerLanding: String = "test_landing" @@ -161,7 +158,7 @@ class DeltaLakeLoadTest extends AnyFeatureSpec with BaseAlgorithmTest with Given Feature("Unstable target partitions whose values change over time for several rows") { Scenario(s"Unstable target partitions, and parameters are not properly configured") { - Given("ignore_affected_partitions_merge is false") + Given("affected_partitions_merge is false") val resourceDir = "unstable_partitions_wrong_params" copyResourceFileToHdfs(s"$resourceDir/$paramsFile", paramsFileHdfsPath) @@ -203,7 +200,7 @@ class DeltaLakeLoadTest extends AnyFeatureSpec with BaseAlgorithmTest with Given Scenario(s"Unstable target partitions, but parameters are properly configured") { val resourceDir = "unstable_partitions_right_params" - Given("ignore_affected_partitions_merge is true") + Given("affected_partitions_merge is true") copyResourceFileToHdfs(s"$resourceDir/$paramsFile", paramsFileHdfsPath) val targetTable = createTable( diff --git a/src/test/scala/com/adidas/analytics/feature/loads/FullLoadTest.scala b/src/test/scala/com/adidas/analytics/feature/loads/FullLoadTest.scala index 9b2b995..d53299e 100644 --- a/src/test/scala/com/adidas/analytics/feature/loads/FullLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/feature/loads/FullLoadTest.scala @@ -260,6 +260,43 @@ class FullLoadTest extends AnyFeatureSpec with BaseAlgorithmTest { ) fs.exists(targetPath20180110) shouldBe true } + Scenario("Loading data to table partitioned by multiple non-derived columns") { + val resourceDir = "partitioned_multi_columns" + copyResourceFileToHdfs(s"$resourceDir/$paramsFileName", paramsFileHdfsPath) + + val targetSchema = DataType + .fromJson(getResourceAsText(s"$resourceDir/target_schema.json")) + .asInstanceOf[StructType] + val dataReader = FileReader.newDSVFileReader(Some(targetSchema)) + + val targetTable = + createPartitionedTargetTable(Seq("customer", "date"), targetSchema, tableName) + var targetPathTestPartition = + new Path(targetTable.location, "customer=customer5/date=20180110") + setupInitialState(targetTable, s"$resourceDir/lake_data_pre.psv", dataReader) + prepareDefaultSourceData("landing/new_data_multi_partition_columns.psv") + + // checking pre-conditions + spark.read.csv(sourceDirPath.toString).count() shouldBe 25 + targetTable.read().count() shouldBe 19 + fs.exists(targetPathTestPartition) shouldBe false + + // executing load + FullLoad(spark, dfs, paramsFileHdfsPath.toString).run() + + // validating result + val expectedDataLocation = + resolveResource(s"$resourceDir/lake_data_post.psv", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = targetTable.read() + actualDf.hasDiff(expectedDf) shouldBe false + + targetPathTestPartition = new Path( + CatalogTableManager(targetTable.table, spark).getTableLocation, + "customer=customer5/date=20180110" + ) + fs.exists(targetPathTestPartition) shouldBe true + } Scenario( "Partitioned table is loaded and old leftovers are cleansed properly after successful load" diff --git a/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala b/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala index 881ee34..309af00 100644 --- a/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala +++ b/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala @@ -169,4 +169,29 @@ class DateComponentDerivationTest transformedDf.hasDiff(expectedDf) shouldBe false } + test("Partition by year/month/day with formatter yyyy-MM-dd HH:mm:ss") { + + val sampleDf = + Seq(("2020-04-27 10:12:14"), ("2020-04-28 12:00:14"), ("2020-04-29 22:02:03")).toDF("partcol") + + val dateComponentDerivationTester: DataFrame => DataFrame = + new DateComponentDerivationSubClass().validateWithDateComponents( + sourceDateColumnName = "partcol", + sourceDateFormat = "yyyy-MM-dd HH:mm:ss", + targetDateComponentColumnNames = Seq("year", "month", "day") + ) + + val transformedDf = sampleDf.transform(dateComponentDerivationTester) + + val expectedDf = + Seq( + ("2020-04-27 10:12:14", 2020, 4, 27), + ("2020-04-28 12:00:14", 2020, 4, 28), + ("2020-04-29 22:02:03", 2020, 4, 29) + ) + .toDF("partcol", "year", "month", "day") + + transformedDf.hasDiff(expectedDf) shouldBe false + } + } diff --git a/src/test/scala/com/adidas/utils/SparkSupport.scala b/src/test/scala/com/adidas/utils/SparkSupport.scala index fa0c3c8..2e6852e 100644 --- a/src/test/scala/com/adidas/utils/SparkSupport.scala +++ b/src/test/scala/com/adidas/utils/SparkSupport.scala @@ -27,7 +27,7 @@ trait SparkSupport extends SparkSessionWrapper { new SparkConf(false) .set("spark.ui.enabled", "false") .set("spark.sql.warehouse.dir", new File(sparkTestDir, "warehouse").getAbsolutePath) - .set("spark.sql.shuffle.partitions", "8") + .set("spark.sql.shuffle.partitions", "2") } { (sparkConf, hadoopConf) => hadoopConf.asScala.foldLeft(sparkConf)((sc, entry) => sc.set(s"spark.hadoop.${entry.getKey}", entry.getValue) @@ -41,7 +41,7 @@ trait SparkSupport extends SparkSessionWrapper { .builder() .config(sparkConf) .appName(s"test-${getClass.getName}") - .master("local[*]") + .master("local[2]") .enableHiveSupport() .getOrCreate() }