Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1416 S3 input/output for Enceladus on EMR (S2.4, H2.8.5) #1483

Merged
merged 7 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@
<!--dependency versions-->
<abris.version>3.1.1</abris.version>
<atum.version>0.2.6</atum.version>
<aws.java.sdk.version>2.13.65</aws.java.sdk.version>
<bower.chart.js.version>2.7.3</bower.chart.js.version>
<bson.codec.jsr310.version>3.5.4</bson.codec.jsr310.version>
<cobrix.version>2.1.0</cobrix.version>
<cronstrue.version>1.79.0</cronstrue.version>
<diffson.version>2.0.2</diffson.version>
<gson.version>2.8.2</gson.version>
<guava.version>27.0.1-jre</guava.version>
<hadoop.version>2.7.7</hadoop.version>
<hadoop.version>2.8.5</hadoop.version>
<htrace.version>3.1.0-incubating</htrace.version>
<httpclient.version>4.4.1</httpclient.version>
<jackson.spark.datatype.version>2.6.7</jackson.spark.datatype.version>
<jackson.spark.version>2.6.7.1</jackson.spark.version>
<jackson.spark.datatype.version>2.10.4</jackson.spark.datatype.version>
<jackson.spark.version>2.10.4</jackson.spark.version>
<jackson.version>2.9.8</jackson.version>
<jjwt.version>0.10.7</jjwt.version>
<junit.version>4.11</junit.version>
Expand Down Expand Up @@ -202,6 +203,18 @@
<log.specialfilters.acceptonmatch>false</log.specialfilters.acceptonmatch>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${aws.java.sdk.version}</version>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the SDK is not being used, yet as of now, but the support is now added because the usage is expected based on the S3 PoC.

<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -225,6 +238,36 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<!-- using AWS SDK v2 instead of v1 -->
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</exclusion>
<!-- contains older version than supplied otherwise -->
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import za.co.absa.enceladus.utils.performance.PerformanceMeasurer
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Random, Success, Try}

trait CommonJobExecution {

Expand Down Expand Up @@ -103,11 +103,13 @@ trait CommonJobExecution {
protected def validateOutputPath(fsUtils: FileSystemVersionUtils, pathConfig: PathConfig): Unit

protected def validateIfPathAlreadyExists(fsUtils: FileSystemVersionUtils, path: String): Unit = {
if (fsUtils.hdfsExists(path)) {
throw new IllegalStateException(
s"Path $path already exists. Increment the run version, or delete $path"
)
}
// TODO fix for s3 [ref issue #1416]

// if (fsUtils.hdfsExists(path)) {
// throw new IllegalStateException(
// s"Path $path already exists. Increment the run version, or delete $path"
// )
// }
}

protected def runPostProcessing[T](sourcePhase: SourcePhase, preparationResult: PreparationResult, jobCmdConfig: JobConfigParser[T])
Expand All @@ -117,6 +119,7 @@ trait CommonJobExecution {
case _ => preparationResult.pathCfg.publishPath
}

log.info(s"rereading outputPath $outputPath to run postProcessing")
val df = spark.read.parquet(outputPath)
val runId = MenasPlugin.runNumber

Expand All @@ -127,8 +130,8 @@ trait CommonJobExecution {
}.mkString(",")
}

val sourceSystem = Atum.getControlMeasure.metadata.sourceApplication
val uniqueRunId = Atum.getControlMeasure.runUniqueId
val sourceSystem = "source1" //Atum.getControlMeasure.metadata.sourceApplication // TODO fix for s3 [ref issue #1416]
val uniqueRunId = Some(s"runId-${Math.abs(Random.nextLong())}") //Atum.getControlMeasure.runUniqueId // TODO fix for s3 [ref issue #1416]

val params = ErrorSenderPluginParams(jobCmdConfig.datasetName,
jobCmdConfig.datasetVersion, jobCmdConfig.reportDate, preparationResult.reportVersion, outputPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object MenasPlugin {
isJobStageOnly,
generateNewRun)
listener = Option(eventListener)
PluginManager.loadPlugin(eventListener)
//PluginManager.loadPlugin(eventListener) // TODO fix for s3 [ref issue #1416]
}

def runUniqueId: Option[String] = listener.flatMap(_.runUniqueId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ trait ConformanceExecution extends CommonJobExecution {
cmd: ConformanceConfigParser[T],
fsUtils: FileSystemVersionUtils,
spark: SparkSession): Unit = {
val stdDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.standardizationPath)
preparationResult.performance.startMeasurement(stdDirSize)
//val stdDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.standardizationPath)
//preparationResult.performance.startMeasurement(stdDirSize) // TODO fix for s3 [ref issue #1416]

log.info(s"standardization path: ${preparationResult.pathCfg.standardizationPath}")
log.info(s"publish path: ${preparationResult.pathCfg.publishPath}")
Expand All @@ -67,11 +67,13 @@ trait ConformanceExecution extends CommonJobExecution {
}

// InputPath is standardizationPath in the combined job
spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.standardizationPath}/_INFO")
.setControlMeasuresWorkflow(sourceId.toString)
// TODO fix for s3 [ref issue #1416]
// spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.standardizationPath}/_INFO")
// .setControlMeasuresWorkflow(sourceId.toString)

// Enable control framework performance optimization for pipeline-like jobs
Atum.setAllowUnpersistOldDatasets(true)
// TODO fix for s3 [ref issue #1416]
//Atum.setAllowUnpersistOldDatasets(true)

// Enable Menas plugin for Control Framework
MenasPlugin.enableMenas(
Expand Down Expand Up @@ -105,16 +107,16 @@ trait ConformanceExecution extends CommonJobExecution {
implicit val featureSwitcher: FeatureSwitches = conformanceReader.readFeatureSwitches()

Try {
handleControlInfoValidation()
// handleControlInfoValidation() // TODO fix for s3 [ref issue #1416]
DynamicInterpreter.interpret(preparationResult.dataset, inputData)
} match {
case Failure(e: ValidationException) =>
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, e.techDetails)
// AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, e.techDetails) // TODO fix for s3 [ref issue #1416]
throw e
case Failure(NonFatal(e)) =>
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, sw.toString)
// AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, sw.toString) // TODO fix for s3 [ref issue #1416]
throw e
case Success(conformedDF) =>
if (SchemaUtils.fieldExists(Constants.EnceladusRecordId, conformedDF.schema)) {
Expand All @@ -134,47 +136,53 @@ trait ConformanceExecution extends CommonJobExecution {
fsUtils: FileSystemVersionUtils): Unit = {
val cmdLineArgs: String = args.mkString(" ")

PerformanceMetricTools.addJobInfoToAtumMetadata(
"conform",
preparationResult.pathCfg.standardizationPath,
preparationResult.pathCfg.publishPath,
menasCredentials.username, cmdLineArgs
)
// TODO fix for s3 [ref issue #1416]
// PerformanceMetricTools.addJobInfoToAtumMetadata(
// "conform",
// preparationResult.pathCfg.standardizationPath,
// preparationResult.pathCfg.publishPath,
// menasCredentials.username, cmdLineArgs
// )

val withPartCols = result
.withColumnIfDoesNotExist(InfoDateColumn, to_date(lit(cmd.reportDate), ReportDateFormat))
.withColumnIfDoesNotExist(InfoDateColumnString, lit(cmd.reportDate))
.withColumnIfDoesNotExist(InfoVersionColumn, lit(preparationResult.reportVersion))

val recordCount = result.lastCheckpointRowCount match {
case None => withPartCols.count
case Some(p) => p
}
// TODO fix for s3 [ref issue #1416]
val recordCount = -1
// val recordCount = result.lastCheckpointRowCount match {
// case None => withPartCols.count
// case Some(p) => p
// }
if (recordCount == 0) {
handleEmptyOutput(SourcePhase.Conformance)
}

// ensure the whole path but version exists
fsUtils.createAllButLastSubDir(preparationResult.pathCfg.publishPath)
//fsUtils.createAllButLastSubDir(preparationResult.pathCfg.publishPath) // TODO fix for s3 [ref issue #1416]

withPartCols.write.parquet(preparationResult.pathCfg.publishPath)

val publishDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.publishPath)
preparationResult.performance.finishMeasurement(publishDirSize, recordCount)
PerformanceMetricTools.addPerformanceMetricsToAtumMetadata(
spark,
"conform",
preparationResult.pathCfg.standardizationPath,
preparationResult.pathCfg.publishPath,
menasCredentials.username, cmdLineArgs
)

withPartCols.writeInfoFile(preparationResult.pathCfg.publishPath)
writePerformanceMetrics(preparationResult.performance, cmd)

if (conformanceReader.isAutocleanStdFolderEnabled()) {
fsUtils.deleteDirectoryRecursively(preparationResult.pathCfg.standardizationPath)
}
// TODO fix for s3 [ref issue #1416]
//val publishDirSize = fsUtils.getDirectorySize(preparationResult.pathCfg.publishPath)
// preparationResult.performance.finishMeasurement(publishDirSize, recordCount)
// PerformanceMetricTools.addPerformanceMetricsToAtumMetadata(
// spark,
// "conform",
// preparationResult.pathCfg.standardizationPath,
// preparationResult.pathCfg.publishPath,
// menasCredentials.username, cmdLineArgs
// )

// TODO fix for s3 [ref issue #1416]
//withPartCols.writeInfoFile(preparationResult.pathCfg.publishPath)
//writePerformanceMetrics(preparationResult.performance, cmd)

// TODO fix for s3 [ref issue #1416]
// if (conformanceReader.isAutocleanStdFolderEnabled()) {
// fsUtils.deleteDirectoryRecursively(preparationResult.pathCfg.standardizationPath)
// }
log.info(s"$sourceId finished successfully")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ object DynamicInterpreter {
implicit val interpreterContext: InterpreterContext = InterpreterContext(inputDf.schema, conformance,
featureSwitches, jobShortName, spark, dao, InterpreterContextArgs.fromConformanceConfig(progArgs))

applyCheckpoint(inputDf, "Start")
// applyCheckpoint(inputDf, "Start") // TODO fix for s3 [ref issue #1416]

val conformedDf = applyConformanceRules(ensureErrorColumnExists(inputDf))

applyCheckpoint(conformedDf, "End")
// applyCheckpoint(conformedDf, "End") // TODO fix for s3 [ref issue #1416]
logExecutionPlan(conformedDf)

conformedDf
Expand Down
Loading