Skip to content

Commit

Permalink
migration to spark 3 (#5)
Browse files Browse the repository at this point in the history
Co-authored-by: Costa, Carlos <->
  • Loading branch information
epilif1017a authored May 4, 2021
1 parent c22d447 commit 3875d93
Show file tree
Hide file tree
Showing 53 changed files with 798 additions and 379 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ metastore_db/
# Scala-IDE specific
.scala_dependencies
.worksheet
/.bsp/sbt.json
46 changes: 46 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.0.0] - 2020-05
### Migrated
- Spark 3.0 Migration
* Migrate to Spark version 3.0.1, Hadoop 3.2.1 and Scala 2.12
* Spark 3 uses the Proleptic Gregorian calendar.
In case there are problems when data sources have dates before 1582 or other problematics formats, as a quick fix we can set the
following spark parameters in the pipelines:
```
"spark.sql.legacy.timeParserPolicy": "LEGACY", "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY", "spark.sql.legacy.parquet.datetimeRebaseModeInRead": "LEGACY"
```
An example of an exception related to parsing dates and timestamps looks like this:
```
SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '00/00/0000' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
```
Note 1: there's also two other exceptions that we observed related to reading or writing Parquets with old date/time formats.
They look very similar to the Spark upgrade exception above, but highlight the need to change the respective spark.sql.legacy.parquet.datetimeRebaseModeInXXXXX property.
Note 2: the solution provided above should cover all the exceptions enumerated here for a given data source.
## [5.8.0] - 2020-04
### Added
- Fix reconciliation execution time by removing unneeded caching stage.
## [5.7.5] - 2020-04
### Added
- Enable multi-line option for append loads
- fix duplicate issues generated by the latest changes applied to CompetitorDataPreprocessor
### [5.7.2] - 2021-02
#### Added
- Make init condensation optional, but true by default.
### [5.7.1] - 2020-02
#### Added
- Modify append load to support more complex partitioning strategies without file_regex
- Added support for configuring write load mode and num output files in append load
- Support for specifying the quote and escape characters. More info on how to specify those here: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html
### [5.7.0] - 2020-01
#### Added
- Support for multiple partition attributes (non date-derived) and single non date-derived partition attributes.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM amazonlinux:2.0.20200722.0
FROM amazonlinux:2.0.20210326.0

ARG JAVA_VERSION=1.8.0

Expand All @@ -17,4 +17,4 @@ RUN groupadd -r m3d && \
chown m3d:m3d /home/m3d
USER m3d

CMD ["/bin/bash"]
CMD ["/bin/bash"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
M3D Engine
=======

![M3D logo](/static/images/m3d_logo.png)
![M3DLogo](static/images/m3d_logo.png)

**M3D** stands for _Metadata Driven Development_ and is a cloud and platform agnostic framework for the automated creation, management and governance of metadata and data flows from multiple source to multiple target systems. The main features and design goals of M3D are:

Expand Down
27 changes: 15 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import sbt.ExclusionRule

name := "m3d-engine"
version := "1.0"
version := "6.0.0"

scalaVersion := "2.11.12"
scalaVersion := "2.12.13"
addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.4.11" cross CrossVersion.full)
scalacOptions += "-Yrangepos"
semanticdbEnabled := true
semanticdbVersion := scalafixSemanticdb.revision
scalacOptions += "-Ywarn-unused-import"

val sparkVersion = "2.4.4"
val hadoopVersion = "2.8.5"
val sparkVersion = "3.0.1"
val hadoopVersion = "3.2.1"

conflictManager := sbt.ConflictManager.latestRevision

mainClass in Compile := Some("com.adidas.analytics.AlgorithmFactory")
Compile / mainClass := Some("com.adidas.analytics.AlgorithmFactory")

/* =====================
* Dependencies
Expand All @@ -32,21 +33,23 @@ libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion % Provi
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided withExclusions Vector(
ExclusionRule("io.netty", "netty-all")
)

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Provided
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs-client" % hadoopVersion % Provided
libraryDependencies += "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion % Provided

libraryDependencies += "joda-time" % "joda-time" % "2.9.3" % Provided
libraryDependencies += "joda-time" % "joda-time" % "2.10.10" % Provided
libraryDependencies += "org.joda" % "joda-convert" % "2.2.1"

libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.30"

libraryDependencies += "io.delta" %% "delta-core" % "0.6.1"
libraryDependencies += "io.delta" %% "delta-core" % "0.8.0"

/* =====================
* Dependencies for test
* ===================== */

libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.1" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.7" % Test

libraryDependencies +=
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Test classifier "tests" withExclusions Vector(
Expand All @@ -57,10 +60,10 @@ libraryDependencies +=
ExclusionRule("io.netty", "netty-all")
)

fork in Test := true
Test / fork := true

// disable parallel execution
parallelExecution in Test := false
Test / parallelExecution := false

// skipping tests when running assembly
test in assembly := {}
assembly / test := {}
3 changes: 1 addition & 2 deletions common.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

#!/bin/sh
function array_contains() {
local LOCAL_NEEDLE=$1
shift
Expand Down
18 changes: 14 additions & 4 deletions dev-env.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

#!/bin/sh
set -e

SCRIPT_NAME="dev-env.sh"
Expand All @@ -15,6 +14,7 @@ OPTION_INTERACTIVE=("interactive" "i" "use interactive mode and allocate pseudo-
ARG_ACTION_IMAGE_BUILD=("image-build" "build the docker image")
ARG_ACTION_CONTAINER_RUN=("container-run" "run a container from the docker image")
ARG_ACTION_CONTAINER_EXECUTE=("container-execute" "execute an external command within the container")
ARG_ACTION_CONTAINER_START=("container-start" "start the container")
ARG_ACTION_CONTAINER_STOP=("container-stop" "stop the container")
ARG_ACTION_CONTAINER_DELETE=("container-delete" "delete the container")
ARG_ACTION_PROJECT_FORMAT=("project-format" "format the code")
Expand All @@ -28,6 +28,7 @@ AVAILABLE_ACTIONS=(
"$ARG_ACTION_IMAGE_BUILD"
"$ARG_ACTION_CONTAINER_RUN"
"$ARG_ACTION_CONTAINER_EXECUTE"
"$ARG_ACTION_CONTAINER_START"
"$ARG_ACTION_CONTAINER_STOP"
"$ARG_ACTION_CONTAINER_DELETE"
"$ARG_ACTION_PROJECT_FORMAT"
Expand All @@ -47,6 +48,7 @@ function create_actions_help_string() {
"ARG_ACTION_IMAGE_BUILD" \
"ARG_ACTION_CONTAINER_RUN" \
"ARG_ACTION_CONTAINER_EXECUTE" \
"ARG_ACTION_CONTAINER_START" \
"ARG_ACTION_CONTAINER_STOP" \
"ARG_ACTION_CONTAINER_DELETE" \
"ARG_ACTION_PROJECT_ASSEMBLE" \
Expand Down Expand Up @@ -175,6 +177,14 @@ elif [[ "$ACTION" == "$ARG_ACTION_CONTAINER_RUN" ]]; then
docker run -t -d --name "$CONTAINER_INSTANCE_NAME" -v "${WORKSPACE}:/m3d/workspace/${PROJECT_NAME}" -p 5005:5005 "$CONTAINER_IMAGE_NAME"
fi

# start the container
elif [[ "$ACTION" == "$ARG_ACTION_CONTAINER_START" ]]; then
echo "Starting dev environment ..."
validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}"

echo "Starting the container ..."
(docker start "$CONTAINER_INSTANCE_NAME" 1>/dev/null && echo "The container was started")

# cleanup files generated by SBT
elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_CLEAN" ]]; then
echo "Deleting files generated by SBT ..."
Expand Down Expand Up @@ -221,12 +231,12 @@ elif [[ "$ACTION" == "$ARG_ACTION_PROJECT_TEST" ]]; then
validate_args_are_empty "$HELP_STRING" "${OTHER_ARGS[@]}"

if [[ -z "$DEBUG" ]]; then
SBT_OPTS="-Xms512M -Xmx512M"
SBT_OPTS="-Xms512M -Xmx512M -XX:+UseG1GC"
SBT_CMD="SBT_OPTS=\"${SBT_OPTS}\" sbt \"test:testOnly ${TEST_FILTER}\""
exec_command_within_container "$CONTAINER_INSTANCE_NAME" "$PROJECT_NAME" "$SBT_CMD" "$INTERACTIVE"
else
echo "Debugging is enabled"
SBT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms512M -Xmx512M"
SBT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms512M -Xmx512M -XX:+UseG1GC"
SBT_CMD="SBT_OPTS=\"${SBT_OPTS}\" sbt \";set Test / fork := false; test:testOnly ${TEST_FILTER}\""
exec_command_within_container "$CONTAINER_INSTANCE_NAME" "$PROJECT_NAME" "$SBT_CMD" 1
fi
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.13
sbt.version = 1.5.0
6 changes: 5 additions & 1 deletion src/main/scala/com/adidas/analytics/AlgorithmFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ object AlgorithmFactory {
.config("hive.stats.fetch.column.stats", "true")
.config("hive.stats.fetch.partition.stats", "true")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
.config("spark.sql.csv.parser.columnPruning.enabled", "false")
/* We maintain this option to ensure that Hive and Spark < 3.0 downstream consumers can
* properly read the Parquet data with appropriate data types and date formats. */
.config("spark.sql.parquet.writeLegacyFormat", "true")
// Configs for delta lake load... more specifics added in the DeltaLakeLoad config
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.enableHiveSupport()
.getOrCreate()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class GzipDecompressor protected (
Future.sequence(
fileSystem
.ls(inputDirectoryPath, recursive)
.filterNot(path => fileSystem.isDirectory(path))
.filterNot(path => fileSystem.getFileStatus(path).isDirectory)
.map { compressedFilePath =>
Future {
val decompressedFilePath =
Expand All @@ -54,9 +54,8 @@ final class GzipDecompressor protected (
if (compressedFilePath.getName.endsWith(".zip")) {
val zin = new ZipInputStream(fileSystem.open(compressedFilePath))
/* Warning: we intentionally only support zip files with one entry here as we want
* to control */
/* the output name and can not merge multiple entries because they may have
* headers. */
* to control the output name and can not merge multiple entries because they may
* have headers. */
zin.getNextEntry
zin
} else {
Expand Down
30 changes: 21 additions & 9 deletions src/main/scala/com/adidas/analytics/algo/loads/AppendLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.immutable

/** Performs append load of new records to an existing table.
Expand All @@ -27,7 +28,18 @@ final class AppendLoad protected (
with TableStatistics
with DateComponentDerivation {

override protected def read(): Vector[DataFrame] = readInputData(targetSchema, spark, dfs)
override protected def read(): Vector[DataFrame] =
if (regexFilename.nonEmpty || partitionSourceColumn.nonEmpty)
readInputData(targetSchema, spark, dfs)
else
Vector(
readInputFiles(
Seq(inputDir),
fileFormat,
if (!verifySchema) Some(targetSchema) else None,
spark.read.options(sparkReaderOptions)
)
)

override protected def transform(dataFrames: Vector[DataFrame]): Vector[DataFrame] =
if (partitionSourceColumn.nonEmpty)
Expand Down Expand Up @@ -80,7 +92,7 @@ final class AppendLoad protected (
): Seq[Source] = {
logger.info(s"Looking for input files in $inputDirPath")
val targetSchemaWithoutTargetPartitions =
getSchemaWithouttargetPartitions(targetSchema, targetPartitions.toSet)
getSchemaWithoutTargetPartitions(targetSchema, targetPartitions.toSet)
if (partitionSourceColumn.nonEmpty)
fs.ls(inputDirPath, recursive = true)
.map(sourcePath => Source(targetSchemaWithoutTargetPartitions, sourcePath.toString))
Expand Down Expand Up @@ -169,19 +181,19 @@ final class AppendLoad protected (
private def readSources(sources: Seq[Source], spark: SparkSession): Vector[DataFrame] =
groupSourcesBySchema(sources).map {
case (schema, inputPaths) =>
readInputFiles(inputPaths, fileFormat, schema, spark.read.options(sparkReaderOptions))
readInputFiles(inputPaths, fileFormat, Some(schema), spark.read.options(sparkReaderOptions))
}.toVector

private def readInputFiles(
inputPaths: Seq[String],
fileFormat: String,
schema: StructType,
schema: Option[StructType],
reader: DataFrameReader
): DataFrame =
fileFormat match {
case "dsv" => DSVFormat(Some(schema)).read(reader, inputPaths: _*)
case "parquet" => ParquetFormat(Some(schema)).read(reader, inputPaths: _*)
case "json" => JSONFormat(Some(schema)).read(reader, inputPaths: _*)
case "dsv" => DSVFormat(schema, multiLine = isMultiline).read(reader, inputPaths: _*)
case "parquet" => ParquetFormat(schema).read(reader, inputPaths: _*)
case "json" => JSONFormat(schema, multiLine = isMultiline).read(reader, inputPaths: _*)
case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat")
}
}
Expand All @@ -197,7 +209,7 @@ object AppendLoad {
private def extractPathWithoutServerAndProtocol(path: String): String =
path.replaceFirst("\\w+\\d*://.+?/", "")

private def getSchemaWithouttargetPartitions(
private def getSchemaWithoutTargetPartitions(
targetSchema: StructType,
targetPartitions: Set[String]
): StructType =
Expand Down Expand Up @@ -263,7 +275,7 @@ object AppendLoad {
val fs = dfs.getFileSystem(headerDirPath)
dataFrames.foreach { df =>
val schemaJson =
getSchemaWithouttargetPartitions(df.schema, targetPartitions.toSet).prettyJson
getSchemaWithoutTargetPartitions(df.schema, targetPartitions.toSet).prettyJson
df.collectPartitions(targetPartitions).foreach { partitionCriteria =>
val subdirectories = DataFrameUtils.mapPartitionsToDirectories(partitionCriteria)
val headerPath = new Path(headerDirPath.join(subdirectories), headerFileName)
Expand Down
31 changes: 21 additions & 10 deletions src/main/scala/com/adidas/analytics/algo/loads/DeltaLakeLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class DeltaLakeLoad protected (
case "parquet" => ParquetFormat()
case "dsv" => DSVFormat()
case "json" =>
JSONFormat(multiLine = isMultilineJSON.getOrElse(false), optionalSchema = readJsonSchema)
JSONFormat(multiLine = isMultiline, optionalSchema = readJsonSchema)
case _ => throw new RuntimeException(s"Unsupported input data format $fileFormat.")
}

Expand Down Expand Up @@ -81,12 +81,14 @@ final class DeltaLakeLoad protected (

deltaTableDF
} else {
val condensedNewDataDF = condenseNewData(newDataDF, initLoad = true)

affectedPartitions = condensedNewDataDF.collectPartitions(targetPartitions)

initDeltaTable(condensedNewDataDF)
val initDataDF =
if (initCondensation)
condenseNewData(newDataDF, initLoad = true)
else
newDataDF

affectedPartitions = initDataDF.collectPartitions(targetPartitions)
initDeltaTable(initDataDF)
DeltaTable.forPath(deltaTableDir).toDF
}

Expand Down Expand Up @@ -251,7 +253,10 @@ final class DeltaLakeLoad protected (

/** Given a collection of column names it builds the adequate delta merge match condition. E.g.:
* Given Seq("sales_id", "sales_date") it returns currentData.sales_id = newData.sales_id AND
* currentData.sales_date = newData.sales_date
* currentData.sales_date = newData.sales_date. If affectedPartitionsMerge is true, then the
* merge condition will include the columns of the business key plus a filter with the affected
* partitions, otherwise it will include the columns of the business key plus the partition
* columns.
*
* @param columns
* list of column names to build the match condition
Expand All @@ -264,9 +269,15 @@ final class DeltaLakeLoad protected (

targetPartitions match {
case _ :: _ =>
if (!ignoreAffectedPartitionsMerge) generateCondition(businessKey.union(targetPartitions))
else
s"${generateCondition(businessKey)} $logicalOperator (${generateAffectedPartitionsWhere(forceAdditionOfNullPartitionCriteria(affectedPartitions, targetPartitions), s"$currentDataAlias.")})"
if (!affectedPartitionsMerge)
generateCondition(businessKey.union(targetPartitions))
else {
val affectedPartitionsWhere = generateAffectedPartitionsWhere(
forceAdditionOfNullPartitionCriteria(affectedPartitions, targetPartitions),
s"$currentDataAlias."
)
s"${generateCondition(businessKey)} $logicalOperator ($affectedPartitionsWhere)"
}
case _ => generateCondition(businessKey)
}
}
Expand Down
Loading

0 comments on commit 3875d93

Please sign in to comment.