Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1015 Refactoring Conformance and Standardization #1377

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6c015f4
Merge branch 'develop'
benedeki Apr 27, 2020
47dfb7b
Merge branch 'develop'
benedeki May 15, 2020
1a9e6db
Merge branch 'develop'
benedeki May 29, 2020
0954f90
#1307 Refactoring started: changed config, separated logic from the j…
Adrian-Olosutean Jun 8, 2020
6456011
#1307 Moved CommonJobExecution to common package
Adrian-Olosutean Jun 8, 2020
d1298d0
#1015 Header + other small improvements
Adrian-Olosutean Jun 10, 2020
0d60836
#1015 Nested configurations
Adrian-Olosutean Jun 10, 2020
f12f717
#1015 other changes
Adrian-Olosutean Jun 11, 2020
e98c1af
Merge branch 'develop'
benedeki Jun 12, 2020
8375390
#1015 Renamed config classes
Adrian-Olosutean Jun 15, 2020
37f99b0
#1015 scopt4 approach
Adrian-Olosutean Jun 18, 2020
f14d299
#1015 cleaned leftover code
Adrian-Olosutean Jun 18, 2020
b1bd2c9
Merge remote-tracking branch 'origin/master' into feature/1015-extrac…
Adrian-Olosutean Jun 18, 2020
81e9077
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jun 18, 2020
1bf4c3c
#1015 Merging fix
Adrian-Olosutean Jun 18, 2020
38b8be2
#1015 Small improvements
Adrian-Olosutean Jun 19, 2020
9fbb264
#1015 Updated for cobol is-text
Adrian-Olosutean Jun 19, 2020
618fbfe
1015: Refactoring Conformance and Standardization
benedeki Jun 22, 2020
01fff21
1015: Refactoring Conformance and Standardization
benedeki Jun 24, 2020
9fcc96e
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jun 26, 2020
2c62535
#1015 Integrate fixed-width
Adrian-Olosutean Jun 26, 2020
d31637d
#1015 Added missing header
Adrian-Olosutean Jun 26, 2020
a05e72c
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jun 29, 2020
9b7a710
#1015 Implemented PR comments
Adrian-Olosutean Jun 29, 2020
ae7bf30
#1015 Other feedback
Adrian-Olosutean Jun 30, 2020
6ffb789
#1015 Renamed to JobConfigParser
Adrian-Olosutean Jul 2, 2020
ce015ce
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jul 2, 2020
44e64a5
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jul 13, 2020
5544ed7
#1015 Conflict resolution
Adrian-Olosutean Jul 13, 2020
6556256
#1015 Fixes
Adrian-Olosutean Jul 13, 2020
02ce494
#1015 Moved Atum control framework performance optimization
Adrian-Olosutean Jul 13, 2020
01167e9
#1015 Import reordering
Adrian-Olosutean Jul 13, 2020
d5a8f1f
#1015 Small changes
Adrian-Olosutean Jul 13, 2020
a0a2be5
#1015 Renamed PropertiesProviders and added comments
Adrian-Olosutean Jul 14, 2020
0d8764f
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jul 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package za.co.absa.enceladus.common

import java.text.MessageFormat
import java.time.Instant

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.atum.AtumImplicits
import za.co.absa.atum.core.Atum
import za.co.absa.enceladus.common.plugin.PostProcessingService
import za.co.absa.enceladus.common.plugin.menas.{MenasPlugin, MenasRunUrl}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.rest.MenasConnectionStringParser
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams.ErrorSourceId
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.general.ProjectMetadataTools
import za.co.absa.enceladus.utils.performance.PerformanceMeasurer
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

trait CommonJobExecution {
TimeZoneNormalizer.normalizeJVMTimeZone()
protected val log: Logger = LoggerFactory.getLogger(this.getClass)
protected val conf = ConfigFactory.load()
protected val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))

protected def standardizationPath(jobConfig: JobCmdConfig, reportVersion: Int) =
MessageFormat.format(conf.getString("standardized.hdfs.path"),
jobConfig.datasetName,
jobConfig.datasetVersion.toString,
jobConfig.reportDate,
reportVersion.toString)

protected def getReportVersion(jobConfig: JobCmdConfig, dataset: Dataset)
(implicit fsUtils: FileSystemVersionUtils): Int = {
jobConfig.reportVersion match {
case Some(version) => version
case None =>
val newVersion = fsUtils.getLatestVersion(dataset.hdfsPublishPath, jobConfig.reportDate) + 1
log.warn(s"Report version not provided, inferred report version: $newVersion")
log.warn("This is an EXPERIMENTAL feature.")
log.warn(" -> It can lead to issues when running multiple jobs on a dataset concurrently.")
log.warn(" -> It may not work as desired when there are gaps in the versions of the data being landed.")
newVersion
}
}

protected def obtainSparkSession()(implicit cmd: JobCmdConfig): SparkSession = {
val enceladusVersion = ProjectMetadataTools.getEnceladusVersion
log.info(s"Enceladus version $enceladusVersion")
val reportVersion = cmd.reportVersion.map(_.toString).getOrElse("")
val spark = SparkSession.builder()
.appName(s"Standardisation $enceladusVersion ${cmd.datasetName} ${cmd.datasetVersion} ${cmd.reportDate} $reportVersion")
.getOrCreate()
TimeZoneNormalizer.normalizeSessionTimeZone(spark)
spark
}

protected def initPerformanceMeasurer(path: String)
(implicit spark: SparkSession, fsUtils: FileSystemVersionUtils): PerformanceMeasurer = {
// init performance measurer
val performance = new PerformanceMeasurer(spark.sparkContext.appName)
val stdDirSize = fsUtils.getDirectorySize(path)
performance.startMeasurement(stdDirSize)
performance
}

protected def handleControlInfoValidation(): Unit = {
ControlInfoValidation.addRawAndSourceRecordCountsToMetadata() match {
case Failure(ex: za.co.absa.enceladus.utils.validation.ValidationException) => {
val confEntry = "control.info.validation"
conf.getString(confEntry) match {
case "strict" => throw ex
case "warning" => log.warn(ex.msg)
case "none" =>
case _ => throw new RuntimeException(s"Invalid $confEntry value")
}
}
case Failure(ex) => throw ex
case Success(_) =>
}
}

protected def initFunctionalExtensions(reportVersion: Int,
pathCfg: PathCfg,
isJobStageOnly: Boolean = false,
generateNewRun: Boolean = false)
(implicit spark: SparkSession, dao: MenasDAO,
jobConfig: JobCmdConfig, step: String) = {
// Enable Spline
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()

// Enable Control Framework
import za.co.absa.atum.AtumImplicits.SparkSessionWrapper
spark.enableControlMeasuresTracking(s"${pathCfg.inputPath}/_INFO")
.setControlMeasuresWorkflow(step)

// Enable control framework performance optimization for pipeline-like jobs
Atum.setAllowUnpersistOldDatasets(true)

// Enable non-default persistence storage level if provided in the command line
jobConfig.persistStorageLevel.foreach(Atum.setCachingStorageLevel)

// Enable Menas plugin for Control Framework
MenasPlugin.enableMenas(
conf,
jobConfig.datasetName,
jobConfig.datasetVersion,
jobConfig.reportDate,
reportVersion,
isJobStageOnly,
generateNewRun)
}

protected def validateForExistingOutputPath(fsUtils: FileSystemVersionUtils, pathCfg: PathCfg) = {
if (fsUtils.hdfsExists(pathCfg.outputPath)) {
throw new IllegalStateException(
s"Path ${pathCfg.outputPath} already exists. Increment the run version, or delete ${pathCfg.outputPath}"
)
}
}

protected def writePerformanceMetrics(performance: PerformanceMeasurer, jobCmdConfig: JobCmdConfig) = {
jobCmdConfig.performanceMetricsFile.foreach(fileName => try {
performance.writeMetricsToFile(fileName)
} catch {
case NonFatal(e) => log.error(s"Unable to write performance metrics to file '$fileName': ${e.getMessage}")
})
}

protected def handleEmptyOutputAfterStep()(implicit spark: SparkSession, step: String): Unit = {
import za.co.absa.atum.core.Constants._

val areCountMeasurementsAllZero = Atum.getControlMeasure.checkpoints
.flatMap(checkpoint =>
checkpoint.controls.filter(control =>
control.controlName.equalsIgnoreCase(controlTypeRecordCount)))
.forall(m => Try(m.controlValue.toString.toDouble).toOption.contains(0D))

if (areCountMeasurementsAllZero) {
log.warn(s"Empty output after running $step. Previous checkpoints show this is correct.")
} else {
val errMsg = "Empty output after running $step, while previous checkpoints show non zero record count"
AtumImplicits.SparkSessionWrapper(spark).setControlMeasurementError(step, errMsg, "")
throw new IllegalStateException(errMsg)
}
}

def runPostProcessors(errorSourceId: ErrorSourceId.Value, pathCfg: PathCfg, jobCmdConfig: JobCmdConfig, reportVersion: Int)
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val df = spark.read.parquet(pathCfg.outputPath)
val runId = MenasPlugin.runNumber

if (runId.isEmpty) {
log.warn("No run number found, the Run URL cannot be properly reported!")
}

// reporting the UI url(s) - if more than one, its comma-separated
val runUrl: Option[String] = runId.map { runNumber =>
menasBaseUrls.map { menasBaseUrl =>
MenasRunUrl.getMenasUiRunUrl(menasBaseUrl, jobCmdConfig.datasetName, jobCmdConfig.datasetVersion, runNumber)
}.mkString(",")
}

val sourceSystem = Atum.getControlMeasure.metadata.sourceApplication
val uniqueRunId = Atum.getControlMeasure.runUniqueId

val params = ErrorSenderPluginParams(jobCmdConfig.datasetName,
jobCmdConfig.datasetVersion, jobCmdConfig.reportDate, reportVersion, pathCfg.outputPath,
errorSourceId, sourceSystem, runUrl, runId, uniqueRunId, Instant.now)
val postProcessingService = PostProcessingService(conf, params)
postProcessingService.onSaveOutput(df)
}

protected def executePostStep(jobConfig: JobCmdConfig): Unit = {
val name = jobConfig.datasetName
val version = jobConfig.datasetVersion
MenasPlugin.runNumber.foreach { runNumber =>
menasBaseUrls.foreach { menasBaseUrl =>
val apiUrl = MenasRunUrl.getMenasApiRunUrl(menasBaseUrl, name, version, runNumber)
val uiUrl = MenasRunUrl.getMenasUiRunUrl(menasBaseUrl, name, version, runNumber)

log.info(s"Menas API Run URL: $apiUrl")
log.info(s"Menas UI Run URL: $uiUrl")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.common

import org.apache.spark.storage.StorageLevel
import scopt.OptionParser
import za.co.absa.enceladus.dao.auth.{InvalidMenasCredentialsFactory, MenasCredentialsFactory, MenasKerberosCredentialsFactory, MenasPlainCredentialsFactory}

import scala.util.matching.Regex

case class JobCmdConfig(args: Array[String] = Array.empty,
datasetName: String = "",
datasetVersion: Int = 1,
reportDate: String = "",
reportVersion: Option[Int] = None,
menasCredentialsFactory: MenasCredentialsFactory = InvalidMenasCredentialsFactory,
performanceMetricsFile: Option[String] = None,
folderPrefix: Option[String] = None,
persistStorageLevel: Option[StorageLevel] = None)

object JobCmdConfig {
def getCmdLineArguments(args: Array[String], step: String): JobCmdConfig = {
val parser = new CmdParser(s"spark-submit [spark options] ${step}Bundle.jar")

val optionCmd: Option[JobCmdConfig] = parser.parse(args, JobCmdConfig(args))

optionCmd.getOrElse(JobCmdConfig())
}

private class CmdParser(programName: String) extends OptionParser[JobCmdConfig](programName) {
override def errorOnUnknownArgument = false

opt[String]('D', "dataset-name").required().action((value, config) =>
config.copy(datasetName = value)).text("Dataset name")

opt[Int]('d', "dataset-version").required().action((value, config) =>
config.copy(datasetVersion = value)).text("Dataset version")
.validate(value =>
if (value > 0) {
success
} else {
failure("Option --dataset-version must be >0")
})
val reportDateMatcher: Regex = "^\\d{4}-\\d{2}-\\d{2}$".r
opt[String]('R', "report-date").required().action((value, config) =>
config.copy(reportDate = value)).text("Report date in 'yyyy-MM-dd' format")
.validate(value =>
reportDateMatcher.findFirstIn(value) match {
case None => failure(s"Match error in '$value'. Option --report-date expects a date in 'yyyy-MM-dd' format")
case _ => success
})

opt[Int]('r', "report-version").optional().action((value, config) =>
config.copy(reportVersion = Some(value)))
.text("Report version. If not provided, it is inferred based on the publish path (it's an EXPERIMENTAL feature)")
.validate(value =>
if (value > 0) {
success
} else {
failure("Option --report-version must be >0")
})
private var credsFile: Option[String] = None
private var keytabFile: Option[String] = None
opt[String]("menas-credentials-file").hidden.optional().action({ (file, config) =>
credsFile = Some(file)
config.copy(menasCredentialsFactory = new MenasPlainCredentialsFactory(file))
}).text("Path to Menas credentials config file.").validate(path =>
if (keytabFile.isDefined) {
failure("Only one authentication method is allow at a time")
} else {
success
})

opt[String]("menas-auth-keytab").optional().action({ (file, config) =>
keytabFile = Some(file)
config.copy(menasCredentialsFactory = new MenasKerberosCredentialsFactory(file))
}).text("Path to keytab file used for authenticating to menas").validate({ file =>
if (credsFile.isDefined) {
failure("Only one authentication method is allowed at a time")
} else {
success
}
})

opt[String]("performance-file").optional().action((value, config) =>
config.copy(performanceMetricsFile = Option(value))).text("Produce a performance metrics file at the given location (local filesystem)")

opt[String]("folder-prefix").optional().action((value, config) =>
config.copy(folderPrefix = Option(value))).text("Adds a folder prefix before the infoDateColumn")

opt[String]("persist-storage-level").optional().action((value, config) =>
config.copy(persistStorageLevel = Some(StorageLevel.fromString(value))))
.text("Specifies persistence storage level to use when processing data. Spark's default is MEMORY_AND_DISK.")

checkConfig { config =>
config.menasCredentialsFactory match {
case InvalidMenasCredentialsFactory => failure("No authentication method specified (e.g. --menas-auth-keytab)")
case _ => success
}
}
}
}

final case class PathCfg(inputPath: String, outputPath: String)
Loading