Skip to content

Commit

Permalink
#1371 Combined job updated
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian-Olosutean committed Jun 20, 2020
1 parent b69c950 commit 603329f
Show file tree
Hide file tree
Showing 99 changed files with 1,592 additions and 868 deletions.
2 changes: 1 addition & 1 deletion dao/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion data-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.enceladus.examples

import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -41,7 +41,7 @@ object CustomRuleSample1 {
// scalastyle:off magic.number
val menasBaseUrls = List("http://localhost:8080/menas")
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.examples

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -43,7 +43,7 @@ object CustomRuleSample2 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.examples

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -38,7 +38,7 @@ object CustomRuleSample3 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import scopt.OptionParser
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand Down Expand Up @@ -142,7 +142,7 @@ object CustomRuleSample4 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val dfReader: DataFrameReader = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class UppercaseCustomRuleInterpreter(rule: UppercaseCustomConformanceRule)
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceCmdConfig): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.{ConfCmdConfig, ConfCmdConfigT}
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class StringFuncInterpreter(rule: ColumnFunctionCustomConformanceRule) exte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfigT): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceCmdConfig): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
Expand All @@ -35,7 +35,7 @@ object TestOutputRow {
class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.ConformanceCmdConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -37,7 +37,7 @@ object XPadTestOutputRow {
class LpadCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down Expand Up @@ -185,7 +185,7 @@ class RpadCustomConformanceRuleSuite extends FunSuite with SparkTestBase {
private val conf = ConfigFactory.load()
private val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
private val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/test/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceCmdConfig = ConformanceCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
2 changes: 1 addition & 1 deletion menas/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ class CastingConformanceRuleForm extends ConformanceRuleForm {
{type: "decimal(38,18)"},
{type: "string"},
{type: "date"},
{type: "timestamp"}
{type: "timestamp"},
{type: "binary"}
]
}

Expand Down
2 changes: 1 addition & 1 deletion migrations-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion migrations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion plugins-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion plugins-builtin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,17 @@ class InfoProducerKafka[T](kafkaConnectionParams: KafkaConnectionParams)
logger.info(s"Sending info records ${infoRecord.getClass.getName} to ${kafkaConnectionParams.topicName}...")
val producerRecord = new ProducerRecord[GenericRecord, GenericRecord](kafkaConnectionParams.topicName,
avroKey, avroRecord)

// A future should be started first, since conversion from Java future to Scala future is asynchronous.
// We need to make sure the sending process is started so that if close() is called, it will wait for
// the send to finish.
val sendFuture = kafkaProducer.send(producerRecord)

// Java Future to Scala Future
// Note that the conversion doesn't happen immediately since the body is specified by name.
// That's why we need to actually invoke '.send()' earlier and pass already existing future here.
Future[RecordMetadata] {
kafkaProducer.send(producerRecord).get
sendFuture.get
}.onComplete {
case Success(metadata) =>
logger.info(s"Info records were sent successfully to the Kafka topic, offset=${metadata.offset()}.")
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Enceladus</name>
Expand Down Expand Up @@ -159,7 +159,7 @@
<mongo.java.driver.version>3.6.4</mongo.java.driver.version>
<mockito.version>2.10.0</mockito.version>
<spark.xml.version>0.5.0</spark.xml.version>
<scopt.version>3.7.0</scopt.version>
<scopt.version>4.0.0-RC2</scopt.version>
<kafka.spark.version>0-10</kafka.spark.version>
<abris.version>3.1.1</abris.version>
<spring.version>2.0.0.RELEASE</spring.version>
Expand Down
2 changes: 1 addition & 1 deletion spark-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>za.co.absa.enceladus</groupId>
<artifactId>parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.8.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ trait CommonJobExecution {
protected val conf: Config = ConfigFactory.load()
protected val menasBaseUrls: List[String] = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))

protected def getStandardizationPath(jobConfig: JobCmdConfig, reportVersion: Int): String =
protected def getStandardizationPath[T](jobConfig: JobCmdConfig[T], reportVersion: Int): String =
MessageFormat.format(conf.getString("standardized.hdfs.path"),
jobConfig.datasetName,
jobConfig.datasetVersion.toString,
jobConfig.reportDate,
reportVersion.toString)

protected def getReportVersion(jobConfig: JobCmdConfig, dataset: Dataset)
protected def getReportVersion[T](jobConfig: JobCmdConfig[T], dataset: Dataset)
(implicit fsUtils: FileSystemVersionUtils): Int = {
jobConfig.reportVersion match {
case Some(version) => version
Expand All @@ -65,7 +65,7 @@ trait CommonJobExecution {
}
}

protected def obtainSparkSession()(implicit cmd: JobCmdConfig): SparkSession = {
protected def obtainSparkSession[T]()(implicit cmd: JobCmdConfig[T]): SparkSession = {
val enceladusVersion = ProjectMetadataTools.getEnceladusVersion
log.info(s"Enceladus version $enceladusVersion")
val reportVersion = cmd.reportVersion.map(_.toString).getOrElse("")
Expand Down Expand Up @@ -99,12 +99,12 @@ trait CommonJobExecution {
}
}

protected def initFunctionalExtensions(reportVersion: Int,
protected def initFunctionalExtensions[T](reportVersion: Int,
pathCfg: PathConfig,
isJobStageOnly: Boolean = false,
generateNewRun: Boolean = false)
(implicit spark: SparkSession, dao: MenasDAO,
jobConfig: JobCmdConfig, step: String): Unit = {
jobConfig: JobCmdConfig[T], step: String): Unit = {
// Enable Spline
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()
Expand Down Expand Up @@ -139,7 +139,7 @@ trait CommonJobExecution {
}
}

protected def writePerformanceMetrics(performance: PerformanceMeasurer, jobCmdConfig: JobCmdConfig): Unit = {
protected def writePerformanceMetrics[T](performance: PerformanceMeasurer, jobCmdConfig: JobCmdConfig[T]): Unit = {
jobCmdConfig.performanceMetricsFile.foreach(fileName => try {
performance.writeMetricsToFile(fileName)
} catch {
Expand All @@ -165,7 +165,7 @@ trait CommonJobExecution {
}
}

def runPostProcessors(errorSourceId: ErrorSourceId.Value, pathCfg: PathConfig, jobCmdConfig: JobCmdConfig, reportVersion: Int)
def runPostProcessors[T](errorSourceId: ErrorSourceId.Value, pathCfg: PathConfig, jobCmdConfig: JobCmdConfig[T], reportVersion: Int)
(implicit spark: SparkSession, fileSystemVersionUtils: FileSystemVersionUtils): Unit = {
val df = spark.read.parquet(pathCfg.outputPath)
val runId = MenasPlugin.runNumber
Expand All @@ -191,7 +191,7 @@ trait CommonJobExecution {
postProcessingService.onSaveOutput(df)
}

protected def executePostStep(jobConfig: JobCmdConfig): Unit = {
protected def executePostStep[T](jobConfig: JobCmdConfig[T]): Unit = {
val name = jobConfig.datasetName
val version = jobConfig.datasetVersion
MenasPlugin.runNumber.foreach { runNumber =>
Expand Down
Loading

0 comments on commit 603329f

Please sign in to comment.