Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1015 Refactoring Conformance and Standardization #1377

Merged

Conversation

AdrianOlosutean
Copy link
Contributor

@AdrianOlosutean AdrianOlosutean commented Jun 8, 2020

Closes #1015
Closes #1231
Closes #1232

Main changes:

  1. Extracted common configuration fields into JobCmdConfig which will be contained in StdCmdConfig and ConfCmdConfig
  2. Common functionality for both Standardization and Conformance is moved to CommonJobExecution
  3. Logic specific to each part was moved to (Standardization/Conformance)Reader for reading specific configuration and (Standardization/Conformance)Execution which contains execution logic

One important factor that I am not sure how it should be tackled is how the configuration should be handled for the future job containing Standardization and Conformance together

Copy link
Contributor

@Zejnilovic Zejnilovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code is not runnable. CmdConfig cannot be extended like this. This comment

@AdrianOlosutean AdrianOlosutean changed the title #1307 Refactoring Conformance and Standardization #1015 Refactoring Conformance and Standardization Jun 10, 2020
@AdrianOlosutean AdrianOlosutean marked this pull request as ready for review June 15, 2020 11:12
Copy link
Contributor

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Let's try this on the cluster.

Would the program still complain if an invalid argument is passed?


import scala.collection.immutable.HashMap

class StandardizationReader(log: Logger) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe get a logger from the factory?

private val log = LoggerFactory.getLogger(this.getClass)

Then we can convert this class to object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be an object, yes. I wanted it to be consistent with ConformanceReader which I think should be a class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense leaving it as a class. But what about the logger creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am with @yruslan on this one. I cannot wrap my head around the fact that I need to send logger to this class

Copy link
Contributor

@Zejnilovic Zejnilovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice --dataset-versiona vs --dataset-version. I mistyped one parameter. An error was printed but it did not die on it. It failed on authentication.

spark-submit \
--class za.co.absa.enceladus.standardization.StandardizationJob \
spark-jobs-2.7.0-SNAPSHOT.jar \
--menas-credentials-file menas-credential.properties \
--dataset-name SmallCSV \
--dataset-versiona 10 \
--report-date 2020-01-01 \
--report-version 1 \
--raw-format csv --header true
Error: Missing option --dataset-version
Usage: spark-submit [spark options] StandardizationBundle.jar [options]

  -D, --dataset-name <value>
                           Dataset name
  -d, --dataset-version <value>
                           Dataset version
  -R, --report-date <value>
                           Report date in 'yyyy-MM-dd' format
  -r, --report-version <value>
                           Report version. If not provided, it is inferred based on the publish path (it's an EXPERIMENTAL feature)
  --menas-auth-keytab <value>
                           Path to keytab file used for authenticating to menas
  --performance-file <value>
                           Produce a performance metrics file at the given location (local filesystem)
  --folder-prefix <value>  Adds a folder prefix before the infoDateColumn
  --persist-storage-level <value>
                           Specifies persistence storage level to use when processing data. Spark's default is MEMORY_AND_DISK.
20/06/15 20:21:32 INFO StandardizationJob$: Enceladus version 2.7.0-SNAPSHOT
20/06/15 20:21:32 INFO SparkContext: Running Spark version 2.4.4
20/06/15 20:21:32 INFO SparkContext: Submitted application: Standardisation 2.7.0-SNAPSHOT  1
20/06/15 20:21:32 INFO SecurityManager: Changing view acls to: XXX
20/06/15 20:21:32 INFO SecurityManager: Changing modify acls to: XXX
20/06/15 20:21:32 INFO SecurityManager: Changing view acls groups to:
20/06/15 20:21:32 INFO SecurityManager: Changing modify acls groups to:
...
20/06/15 20:21:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" za.co.absa.enceladus.dao.UnauthorizedException: No Menas credentials provided
	at za.co.absa.enceladus.dao.rest.AuthClient$.apply(AuthClient.scala:32)
	at za.co.absa.enceladus.dao.rest.RestDaoFactory$.getInstance(RestDaoFactory.scala:26)
	at za.co.absa.enceladus.standardization.StandardizationJob$.main(StandardizationJob.scala:47)
	at za.co.absa.enceladus.standardization.StandardizationJob.main(StandardizationJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/06/15 20:21:37 INFO SparkContext: Invoking stop() from shutdown hook

Copy link
Contributor

@Zejnilovic Zejnilovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a 1st part. I like the usage of scopt

//Mutually exclusive parameters don't work in scopt 4
if (args.contains("--menas-credentials-file") && args.contains("--menas-auth-keytab")) {
println("ERROR: Only one authentication method is allowed at a time")
System.exit(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw some exception or return a Try[ConformanceCmdConfig] (that was a CQC consensus for this in Herme)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this on #1392

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry why just there? This is a new class. I don't think it add more content doing it right from start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benedeki sorry, is this still valid?

Copy link
Contributor

@Zejnilovic Zejnilovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reviewed. Nice work. I have mostly small comments

Copy link
Contributor

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked through the changes in last commits. Looks good if we are good with this approach in general.

I had a couple of doubts about how this approach will work for the new job that combines both Standardization and Conformance. But after a discussion with @AdrianOlosutean, I can see it should work.

dk1844
dk1844 previously approved these changes Jul 7, 2020
Copy link
Contributor

@dk1844 dk1844 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(read the code, checked out, built, did not run)

val enabled = cmdParameterOpt match {
case Some(b) => b
case None =>
if (conf.hasPath(configKey)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current version of the ConfigReader.readStringConfig can be reused here.


private def isExperimentalRuleEnabled()(implicit cmd: ConformanceConfig): Boolean = {
val enabled = getCmdOrConfigBoolean(cmd.experimentalMappingRule,
"conformance.mapping.rule.experimental.implementation",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about exporting these keys into some constants? (and the same below)

@@ -159,7 +159,7 @@
<mongo.java.driver.version>3.6.4</mongo.java.driver.version>
<mockito.version>2.10.0</mockito.version>
<spark.xml.version>0.5.0</spark.xml.version>
<scopt.version>3.7.0</scopt.version>
<scopt.version>4.0.0-RC2</scopt.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. are there any of the v4+ features actually being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. The feature for combining parsers

…ct-common-standardization-conformance

# Conflicts:
#	spark-jobs/src/main/scala/za/co/absa/enceladus/conformance/DynamicConformanceJob.scala
#	spark-jobs/src/main/scala/za/co/absa/enceladus/standardization/StandardizationJob.scala
private val confReader: ConfigReader = new ConfigReader(conf)
private val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
private final val SparkCSVReaderMaxColumnsDefault: Int = 20480
object StandardizationJob extends StandardizationExecution {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to run PR, but there is a problem with running standardization:

20/07/13 13:17:54 INFO core.SparkLineageInitializer$: Spline successfully initialized. Spark Lineage tracking is ENABLED.
Exception in thread "main" java.lang.IllegalStateException: Control framework tracking is not initialized.
	at za.co.absa.atum.core.Atum$.preventNotInitialized(Atum.scala:228)
	at za.co.absa.atum.core.Atum$.setAllowUnpersistOldDatasets(Atum.scala:122)
	at za.co.absa.enceladus.common.CommonJobExecution$class.prepareJob(CommonJobExecution.scala:100)
	at za.co.absa.enceladus.standardization.StandardizationJob$.prepareJob(StandardizationJob.scala:26)
	at za.co.absa.enceladus.standardization.StandardizationJob$.main(StandardizationJob.scala:38)
	at za.co.absa.enceladus.standardization.StandardizationJob.main(StandardizationJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, found the missing line

fsUtils: FileSystemVersionUtils,
spark: SparkSession): StructType = {
// Enable Menas plugin for Control Framework
MenasPlugin.enableMenas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with Standardization job.

20/07/13 14:27:12 INFO core.SparkLineageInitializer$: Spline successfully initialized. Spark Lineage tracking is ENABLED.
Exception in thread "main" java.lang.NullPointerException
	at za.co.absa.atum.core.Atum$.addEventListener(Atum.scala:223)
	at za.co.absa.atum.plugins.PluginManager$.loadPlugin(PluginManager.scala:22)
	at za.co.absa.enceladus.common.plugin.menas.MenasPlugin$.enableMenas(MenasPlugin.scala:55)
	at za.co.absa.enceladus.standardization.StandardizationExecution$class.prepareStandardization(StandardizationExecution.scala:56)
	at za.co.absa.enceladus.standardization.StandardizationJob$.prepareStandardization(StandardizationJob.scala:26)
	at za.co.absa.enceladus.standardization.StandardizationJob$.main(StandardizationJob.scala:39)
	at za.co.absa.enceladus.standardization.StandardizationJob.main(StandardizationJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

HuvarVer
HuvarVer previously approved these changes Jul 14, 2020
Copy link
Contributor

@HuvarVer HuvarVer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull, build, run, run e2e tests.
Now it looks that it works correctly.

Zejnilovic
Zejnilovic previously approved these changes Jul 14, 2020
Copy link
Contributor

@Zejnilovic Zejnilovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments implemented

Copy link
Collaborator

@benedeki benedeki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few less important remarks.

import za.co.absa.enceladus.utils.config.ConfigUtils.ConfigImplicits
import za.co.absa.enceladus.conformance.interpreter.{FeatureSwitches, ThreeStateSwitch}

class PropertiesProvider {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reads the properties from the configuration

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that, I meant maybe a short comment on the purpose of the class. (Not to be confused with ConformanceConfig for example) 😉

private val log: Logger = LoggerFactory.getLogger(this.getClass)
private implicit val conf: Config = ConfigFactory.load()

private val standardizedHdfsFolderKey = "conformance.autoclean.standardized.hdfs.folder"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO these private vals belong to companion object.


import scala.collection.immutable.HashMap

class PropertiesProvider {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing.
It's also confusing having two classes of same name, just different path, similar function, but no relationship.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They used to be called Readers and it seems that they can have the same name in different packages because they are only used in their own package

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it does work, but it's still confusing IMHO. 🤔

@AdrianOlosutean AdrianOlosutean dismissed stale reviews from Zejnilovic and HuvarVer via a0a2be5 July 14, 2020 15:12
@sonarcloud
Copy link

sonarcloud bot commented Jul 14, 2020

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities (and Security Hotspot 0 Security Hotspots to review)
Code Smell A 16 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

@AdrianOlosutean AdrianOlosutean merged commit f925efe into develop Jul 14, 2020
@AdrianOlosutean AdrianOlosutean deleted the feature/1015-extract-common-standardization-conformance branch July 14, 2020 15:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants