Skip to content

Commit

Permalink
#1416 - enceladus on S3 - (crude) conformance works on s3 (s3 std inp…
Browse files Browse the repository at this point in the history
…ut, s3 conf output)

 0- all directly-hdfs touching stuff disabled (atum, performance measurements, info files, output path checking)

# Add menasfargate into hosts
sudo nano /etc/hosts
# paste
20.0.63.69 menasfargate
# save & exit (ctrl+O, ctrl+X)

# Running conformance works via:
spark-submit --class za.co.absa.enceladus.conformance.DynamicConformanceJob --conf "spark.driver.extraJavaOptions=-Dmenas.rest.uri=http://menasfargate:8080 -Dstandardized.hdfs.path=s3://euw1-ctodatadev-dev-bigdatarnd-s3-poc/enceladusPoc/ao-hdfs-data/stdOutput/standardized-{0}-{1}-{2}-{3}" ~/enceladusPoc/spark-jobs-2.11.0-SNAPSHOT.jar --menas-credentials-file ~/enceladusPoc/menas-credentials.properties --dataset-name dk_test1_emr285 --dataset-version 1 --report-date 2019-11-27 --report-version 1 2> ~/enceladusPoc/conf-log.txt
  • Loading branch information
dk1844 committed Aug 12, 2020
1 parent f0ebce6 commit da8d4b8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ trait ConformanceExecution extends CommonJobExecution {
}

// InputPath is standardizationPath in the combined job
spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.standardizationPath}/_INFO")
.setControlMeasuresWorkflow(sourceId.toString)
// TODO fix for s3
// spark.enableControlMeasuresTracking(s"${preparationResult.pathCfg.standardizationPath}/_INFO")
// .setControlMeasuresWorkflow(sourceId.toString)

// Enable control framework performance optimization for pipeline-like jobs
Atum.setAllowUnpersistOldDatasets(true)
// TODO fix for s3
//Atum.setAllowUnpersistOldDatasets(true)

// Enable Menas plugin for Control Framework
MenasPlugin.enableMenas(
Expand Down Expand Up @@ -105,16 +107,16 @@ trait ConformanceExecution extends CommonJobExecution {
implicit val featureSwitcher: FeatureSwitches = conformanceReader.readFeatureSwitches()

Try {
handleControlInfoValidation()
// handleControlInfoValidation() // TODO fix for s3
DynamicInterpreter.interpret(preparationResult.dataset, inputData)
} match {
case Failure(e: ValidationException) =>
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, e.techDetails)
// AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, e.techDetails) // TODO fix for s3
throw e
case Failure(NonFatal(e)) =>
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, sw.toString)
// AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(sourceId.toString, e.getMessage, sw.toString) // TODO fix for s3
throw e
case Success(conformedDF) =>
if (SchemaUtils.fieldExists(Constants.EnceladusRecordId, conformedDF.schema)) {
Expand Down Expand Up @@ -147,16 +149,18 @@ trait ConformanceExecution extends CommonJobExecution {
.withColumnIfDoesNotExist(InfoDateColumnString, lit(cmd.reportDate))
.withColumnIfDoesNotExist(InfoVersionColumn, lit(preparationResult.reportVersion))

val recordCount = result.lastCheckpointRowCount match {
case None => withPartCols.count
case Some(p) => p
}
// TODO fix for s3
val recordCount = 100
// val recordCount = result.lastCheckpointRowCount match {
// case None => withPartCols.count
// case Some(p) => p
// }
if (recordCount == 0) {
handleEmptyOutput(SourcePhase.Conformance)
}

// ensure the whole path but version exists
fsUtils.createAllButLastSubDir(preparationResult.pathCfg.publishPath)
//fsUtils.createAllButLastSubDir(preparationResult.pathCfg.publishPath) // TODO fix for s3

withPartCols.write.parquet(preparationResult.pathCfg.publishPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ object DynamicInterpreter {
implicit val interpreterContext: InterpreterContext = InterpreterContext(inputDf.schema, conformance,
featureSwitches, jobShortName, spark, dao, InterpreterContextArgs.fromConformanceConfig(progArgs))

applyCheckpoint(inputDf, "Start")
// applyCheckpoint(inputDf, "Start") // TODO fix for s3

val conformedDf = applyConformanceRules(ensureErrorColumnExists(inputDf))

applyCheckpoint(conformedDf, "End")
// applyCheckpoint(conformedDf, "End") // TODO fix for s3
logExecutionPlan(conformedDf)

conformedDf
Expand Down

0 comments on commit da8d4b8

Please sign in to comment.