Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1015 Refactoring Conformance and Standardization #1377

Merged
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6c015f4
Merge branch 'develop'
benedeki Apr 27, 2020
47dfb7b
Merge branch 'develop'
benedeki May 15, 2020
1a9e6db
Merge branch 'develop'
benedeki May 29, 2020
0954f90
#1307 Refactoring started: changed config, separated logic from the j…
Adrian-Olosutean Jun 8, 2020
6456011
#1307 Moved CommonJobExecution to common package
Adrian-Olosutean Jun 8, 2020
d1298d0
#1015 Header + other small improvements
Adrian-Olosutean Jun 10, 2020
0d60836
#1015 Nested configurations
Adrian-Olosutean Jun 10, 2020
f12f717
#1015 other changes
Adrian-Olosutean Jun 11, 2020
e98c1af
Merge branch 'develop'
benedeki Jun 12, 2020
8375390
#1015 Renamed config classes
Adrian-Olosutean Jun 15, 2020
37f99b0
#1015 scopt4 approach
Adrian-Olosutean Jun 18, 2020
f14d299
#1015 cleaned leftover code
Adrian-Olosutean Jun 18, 2020
b1bd2c9
Merge remote-tracking branch 'origin/master' into feature/1015-extrac…
Adrian-Olosutean Jun 18, 2020
81e9077
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jun 18, 2020
1bf4c3c
#1015 Merging fix
Adrian-Olosutean Jun 18, 2020
38b8be2
#1015 Small improvements
Adrian-Olosutean Jun 19, 2020
9fbb264
#1015 Updated for cobol is-text
Adrian-Olosutean Jun 19, 2020
618fbfe
1015: Refactoring Conformance and Standardization
benedeki Jun 22, 2020
01fff21
1015: Refactoring Conformance and Standardization
benedeki Jun 24, 2020
9fcc96e
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jun 26, 2020
2c62535
#1015 Integrate fixed-width
Adrian-Olosutean Jun 26, 2020
d31637d
#1015 Added missing header
Adrian-Olosutean Jun 26, 2020
a05e72c
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jun 29, 2020
9b7a710
#1015 Implemented PR comments
Adrian-Olosutean Jun 29, 2020
ae7bf30
#1015 Other feedback
Adrian-Olosutean Jun 30, 2020
6ffb789
#1015 Renamed to JobConfigParser
Adrian-Olosutean Jul 2, 2020
ce015ce
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jul 2, 2020
44e64a5
Merge remote-tracking branch 'origin/develop' into feature/1015-extra…
Adrian-Olosutean Jul 13, 2020
5544ed7
#1015 Conflict resolution
Adrian-Olosutean Jul 13, 2020
6556256
#1015 Fixes
Adrian-Olosutean Jul 13, 2020
02ce494
#1015 Moved Atum control framework performance optimization
Adrian-Olosutean Jul 13, 2020
01167e9
#1015 Import reordering
Adrian-Olosutean Jul 13, 2020
d5a8f1f
#1015 Small changes
Adrian-Olosutean Jul 13, 2020
a0a2be5
#1015 Renamed PropertiesProviders and added comments
Adrian-Olosutean Jul 14, 2020
0d8764f
Merge branch 'develop' into feature/1015-extract-common-standardizati…
AdrianOlosutean Jul 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.enceladus.examples

import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -41,7 +41,7 @@ object CustomRuleSample1 {
// scalastyle:off magic.number
val menasBaseUrls = List("http://localhost:8080/menas")
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.examples

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -43,7 +43,7 @@ object CustomRuleSample2 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.examples

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -38,7 +38,7 @@ object CustomRuleSample3 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import scopt.OptionParser
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand Down Expand Up @@ -142,7 +142,7 @@ object CustomRuleSample4 {
val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/main/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val dfReader: DataFrameReader = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class UppercaseCustomRuleInterpreter(rule: UppercaseCustomConformanceRule)
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.ExplosionState
import za.co.absa.enceladus.conformance.interpreter.rules.RuleInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
Expand All @@ -40,7 +40,7 @@ case class StringFuncInterpreter(rule: ColumnFunctionCustomConformanceRule) exte
override def conformanceRule: Option[ConformanceRule] = Some(rule)

def conform(df: Dataset[Row])
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConfCmdConfig): Dataset[Row] = {
(implicit spark: SparkSession, explosionState: ExplosionState, dao: MenasDAO, progArgs: ConformanceConfig): Dataset[Row] = {
handleArrays(rule.outputColumn, df) { flattened =>

// we have to do this if this rule is to support arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
Expand All @@ -35,7 +35,7 @@ object TestOutputRow {
class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import za.co.absa.enceladus.conformance.ConfCmdConfig
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
Expand All @@ -37,7 +37,7 @@ object XPadTestOutputRow {
class LpadCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down Expand Up @@ -185,7 +185,7 @@ class RpadCustomConformanceRuleSuite extends FunSuite with SparkTestBase {
private val conf = ConfigFactory.load()
private val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
private val meansCredentials = MenasKerberosCredentials("user@EXAMPLE.COM", "src/test/resources/user.keytab.example")
implicit val progArgs: ConfCmdConfig = ConfCmdConfig() // here we may need to specify some parameters (for certain rules)
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)

val experimentalMR = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.plugins.builtin.errorsender.mq
import org.apache.log4j.LogManager
import org.apache.spark.sql.functions.{col, explode, lit, size, struct}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.{DataFrame, Encoder, Encoders}
import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor
import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.KafkaConnectionParams
import za.co.absa.enceladus.plugins.builtin.errorsender.DceError
Expand All @@ -27,8 +27,8 @@ import za.co.absa.enceladus.utils.schema.SchemaUtils
import KafkaErrorSenderPluginImpl._
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams.ErrorSourceId
import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes
import za.co.absa.enceladus.utils.modules._

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -87,8 +87,8 @@ case class KafkaErrorSenderPluginImpl(connectionParams: KafkaConnectionParams,
* @return DF with exploded errors and corresponding to the given error source
*/
def getIndividualErrors(dataFrame: DataFrame, params: ErrorSenderPluginParams): DataFrame = {
implicit val singleErrorStardardizedEncoder = Encoders.product[SingleErrorStardardized]
implicit val dceErrorEncoder = Encoders.product[DceError]
implicit val singleErrorStardardizedEncoder: Encoder[SingleErrorStardardized] = Encoders.product[SingleErrorStardardized]
implicit val dceErrorEncoder: Encoder[DceError] = Encoders.product[DceError]

val allowedErrorCodes = KafkaErrorSenderPluginImpl.errorCodesForSource(params.sourceId)

Expand Down Expand Up @@ -168,7 +168,7 @@ object KafkaErrorSenderPluginImpl {
informationDate = Some(reportDate.toLocalDate.toEpochDay.toInt),
outputFileName = Some(additionalParams.outputPath),
recordId = recordId,
errorSourceId = additionalParams.sourceId.toString,
errorSourceId = additionalParams.sourceId.value,
errorType = singleError.errType,
errorCode = singleError.errCode,
errorDescription = singleError.errMsg,
Expand All @@ -184,9 +184,9 @@ object KafkaErrorSenderPluginImpl {
}
}

def errorCodesForSource(sourceId: ErrorSourceId.Value): Seq[String] = sourceId match {
case ErrorSourceId.Standardization => ErrorCodes.standardizationErrorCodes
case ErrorSourceId.Conformance => ErrorCodes.conformanceErrorCodes
def errorCodesForSource(sourceId: SourcePhase): Seq[String] = sourceId match {
case SourcePhase.Standardization => ErrorCodes.standardizationErrorCodes
case SourcePhase.Conformance => ErrorCodes.conformanceErrorCodes
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ package za.co.absa.enceladus.plugins.builtin.errorsender.params

import java.time.Instant

import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams.ErrorSourceId
import za.co.absa.enceladus.utils.modules.SourcePhase

case class ErrorSenderPluginParams(datasetName: String,
datasetVersion: Int,
reportDate: String,
reportVersion: Int,
outputPath: String,
sourceId: ErrorSourceId.Value,
sourceId: SourcePhase,
sourceSystem: String,
runUrls: Option[String],
runId: Option[Int],
Expand All @@ -37,11 +37,6 @@ case class ErrorSenderPluginParams(datasetName: String,

object ErrorSenderPluginParams {

object ErrorSourceId extends Enumeration {
val Standardization = Value("standardizaton")
val Conformance = Value("conformance")
}

object FieldNames {
val datasetName = "datasetName"
val datasetVersion = "datasetVersion"
Expand All @@ -65,7 +60,7 @@ object ErrorSenderPluginParams {
reportDate -> params.reportDate,
reportVersion -> params.reportVersion.toString,
outputPath -> params.outputPath,
sourceId -> params.sourceId.toString,
sourceId -> params.sourceId.asIdentifier,
sourceSystem -> params.sourceSystem,
processingTimestamp -> params.processingTimestamp.toString
) ++
Expand All @@ -80,7 +75,7 @@ object ErrorSenderPluginParams {
reportDate = params(reportDate),
reportVersion = params(reportVersion).toInt,
outputPath = params(outputPath),
sourceId = ErrorSourceId.withName(params(sourceId)),
sourceId = SourcePhase.withIdentifier(params(sourceId)),
sourceSystem = params(sourceSystem),
runUrls = params.get(runUrls),
runId = params.get(runId).map(_.toInt),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import za.co.absa.enceladus.plugins.builtin.errorsender.DceError
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginSuite.{TestingErrCol, TestingRecord}
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams.ErrorSourceId
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.testUtils.SparkTestBase


Expand Down Expand Up @@ -65,17 +65,17 @@ class KafkaErrorSenderPluginSuite extends FlatSpec with SparkTestBase with Match

import spark.implicits._

val testDataDf = testData.toDF
val testNow = Instant.now()
private val testDataDf = testData.toDF
private val testNow = Instant.now()

val defaultPluginParams = ErrorSenderPluginParams(
private val defaultPluginParams = ErrorSenderPluginParams(
"datasetName1", datasetVersion = 1, "2020-03-30", reportVersion = 1, "output/Path1", null,
"sourceSystem1", Some("http://runUrls1"), runId = Some(1), Some("uniqueRunId"), testNow)

"ErrorSenderPluginParams" should "getIndividualErrors (exploding, filtering by source for Standardization)" in {
val plugin = KafkaErrorSenderPluginImpl(null, Map(), Map())

plugin.getIndividualErrors(testDataDf, defaultPluginParams.copy(sourceId = ErrorSourceId.Standardization))
plugin.getIndividualErrors(testDataDf, defaultPluginParams.copy(sourceId = SourcePhase.Standardization))
.as[DceError].collect.map(entry => (entry.errorType, entry.errorCode)) should contain theSameElementsAs Seq(
("stdCastError", "E00000"),
("stdNullError", "E00002"),
Expand All @@ -87,7 +87,7 @@ class KafkaErrorSenderPluginSuite extends FlatSpec with SparkTestBase with Match
it should "getIndividualErrors (exploding, filtering by source for Conformance)" in {
val plugin = KafkaErrorSenderPluginImpl(null, Map(), Map())

plugin.getIndividualErrors(testDataDf, defaultPluginParams.copy(sourceId = ErrorSourceId.Conformance))
plugin.getIndividualErrors(testDataDf, defaultPluginParams.copy(sourceId = SourcePhase.Conformance))
.as[DceError].collect.map(entry => (entry.errorType, entry.errorCode)) should contain theSameElementsAs Seq(
("confMapError", "E00001"),
("confCastError", "E00003"),
Expand All @@ -101,7 +101,7 @@ class KafkaErrorSenderPluginSuite extends FlatSpec with SparkTestBase with Match
val testKafkaUrl = "http://example.com:9092"
val testSchemaRegUrl = "http://example.com:8081"

val testConfig = ConfigFactory.empty()
private val testConfig = ConfigFactory.empty()
.withValue("kafka.error.client.id", ConfigValueFactory.fromAnyRef(testClientId))
.withValue("kafka.error.topic.name", ConfigValueFactory.fromAnyRef(testTopicName))
.withValue("kafka.bootstrap.servers", ConfigValueFactory.fromAnyRef(testKafkaUrl))
Expand Down Expand Up @@ -143,7 +143,7 @@ class KafkaErrorSenderPluginSuite extends FlatSpec with SparkTestBase with Match

// onlyConformanceErrorsDataDf should result in 0 std errors
val onlyConformanceErrorsDataDf = Seq(testData(1)).toDF
errorKafkaPlugin.onDataReady(onlyConformanceErrorsDataDf, defaultPluginParams.copy(sourceId = ErrorSourceId.Standardization).toMap)
errorKafkaPlugin.onDataReady(onlyConformanceErrorsDataDf, defaultPluginParams.copy(sourceId = SourcePhase.Standardization).toMap)

assert(sendErrorsToKafkaWasCalled == false, "KafkaErrorSenderPluginImpl.sentErrorToKafka should not be called for 0 errors")
}
Expand All @@ -160,11 +160,11 @@ class KafkaErrorSenderPluginSuite extends FlatSpec with SparkTestBase with Match
}

Seq(
ErrorSourceId.Standardization -> Seq(
SourcePhase.Standardization -> Seq(
"standardizaton,stdCastError,E00000,Standardization Error - Type cast",
"standardizaton,stdNullError,E00002,Standardization Error - Null detected in non-nullable attribute"
),
ErrorSourceId.Conformance -> Seq(
SourcePhase.Conformance -> Seq(
"conformance,confNegErr,E00004,Conformance Negation Error",
"conformance,confLitErr,E00005,Conformance Literal Error"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ package za.co.absa.enceladus.plugins.builtin.errorsender.params
import java.time.Instant

import org.scalatest.{FlatSpec, Matchers}
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams.ErrorSourceId
import za.co.absa.enceladus.utils.modules.SourcePhase

class ErrorSenderPluginParamsSuite extends FlatSpec with Matchers {

val params = ErrorSenderPluginParams(
private val params = ErrorSenderPluginParams(
datasetName = "datasetName1",
datasetVersion = 1,
reportDate = "2020-03-30",
reportVersion = 1,
outputPath = "output/Path1",
sourceId = ErrorSourceId.Conformance,
sourceId = SourcePhase.Conformance,
sourceSystem = "sourceSystem1",
runUrls = Some("http://runUrls1"),
runId = Some(1),
Expand Down
Loading