Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 persistence (atum, sdk fs usage, ...) #1526

Merged
merged 27 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d638543
Atum 0.2.7-SNAPSHOT integration: build fix, PathConfig - available to…
dk1844 Aug 31, 2020
3985682
Standardization attempt with S3 Atum, need to update logic in atum's …
dk1844 Sep 2, 2020
c1fe721
Standardization attempt with S3 Atum
dk1844 Sep 3, 2020
2c711e0
PathConfigSuite update
dk1844 Sep 3, 2020
d8d4c6d
Conformance attempt on s3
dk1844 Sep 3, 2020
aded4ba
Atum for s3 snapshot lib update
dk1844 Sep 4, 2020
56faa2a
storer+loader set & working
dk1844 Sep 7, 2020
f616ebf
output+input INFO corrected for Standardizaton
dk1844 Sep 7, 2020
c0333cb
param naming update
dk1844 Sep 7, 2020
02bb712
Atum for s3 snapshot lib update
dk1844 Sep 7, 2020
b678b1a
FsUtils divided into LocalFsUtils & HdfsUtils
dk1844 Sep 8, 2020
3f3df43
S3FsUtils - first implementation - some methods do not cover paginati…
dk1844 Sep 14, 2020
04d1950
S3FsUtils with tail-recursive pagination accumulation (non-generic)
dk1844 Sep 17, 2020
3e868e2
S3FsUtils with tail-recursive pagination accumulation - now generic w…
dk1844 Sep 17, 2020
690c8f0
comments added
dk1844 Sep 17, 2020
8e69333
recursion for listAndAccumulate split into wrapper to be called and t…
dk1844 Sep 18, 2020
e5bb0b8
HdfsUtils replace by trait DistributedFsUtils (except for MenasCreden…
dk1844 Sep 18, 2020
b51d050
using final version of s3-powered Atum (3.0.0)
dk1844 Sep 21, 2020
086b3fd
mockito-update version update, scalatest version update
dk1844 Sep 22, 2020
43adcba
S3FsUtilsSuite: exists, read, sizeDir(hidden, non-hidden, reucursive)…
dk1844 Sep 22, 2020
5879a44
testrun update
dk1844 Sep 23, 2020
d2080c9
explicit stubbing fix for hyperdrive
dk1844 Sep 29, 2020
2b8dbd6
comment update
dk1844 Sep 29, 2020
0cc7ace
clenaup
dk1844 Sep 30, 2020
81a4a34
PR touchups
dk1844 Sep 30, 2020
34f4800
PR touchups
dk1844 Sep 30, 2020
c529ea4
PR comment touchups, example generalized
dk1844 Oct 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.dao.auth
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import sun.security.krb5.internal.ktab.KeyTab
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.fs.HdfsUtils

sealed abstract class MenasCredentials {
val username: String
Expand All @@ -40,9 +40,9 @@ object MenasPlainCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasPlainCredentials = {
val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.parseString(fsUtils.getFileContent(path))
val conf = ConfigFactory.parseString(fsUtils.getLocalOrDistributedFileContent(path))
MenasPlainCredentials(conf.getString("username"), conf.getString("password"))
}
}
Expand All @@ -55,9 +55,9 @@ object MenasKerberosCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasKerberosCredentials = {
val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val localKeyTabPath = fsUtils.getLocalPathToFile(path)
val localKeyTabPath = fsUtils.getLocalPathToFileOrCopyToLocal(path)
Copy link
Contributor Author

@dk1844 dk1844 Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic is to load from a local path and then fallback on the same path to HDFS. Do we want to have the option to fallback to a menas-credentials file on S3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking if we want the ability to read menas-credentials file from S3 in general? Answer is yes

Or in some special case?

Copy link
Contributor Author

@dk1844 dk1844 Sep 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes, the former (ability to read menas-credentials file from S3 in general).

However, the slight problem with the current flow is that the path is fallbacked (lookup up locally, if it doesn't exist, the same path is tried on HDFS and copied to a local temp location). That can be reasonably done with local fs and HDFS; with S3, how do you fallback on a local path that does not exist on S3? (you need a bucket, for example).

We can of course making it work with a file from S3, too, but the fallbacking is weird here.

What is the point of fallbacking like this anyway? Isn't it more confusing rather than helpful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fallback is not necessary

val keytab = KeyTab.getInstance(localKeyTabPath)
val username = keytab.getOneName.getName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package za.co.absa.enceladus.dao.rest

import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
import org.mockito.scalatest.MockitoSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers

abstract class BaseTestSuite extends WordSpec
abstract class BaseTestSuite extends AnyWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfter
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package za.co.absa.enceladus.dao.rest

import org.scalatest.{Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.UnauthorizedException
import za.co.absa.enceladus.dao.auth.{InvalidMenasCredentials, MenasKerberosCredentials, MenasPlainCredentials}

class RestDaoFactorySuite extends WordSpec with Matchers {
class RestDaoFactorySuite extends AnyWordSpec with Matchers {

private val menasApiBaseUrls = List("http://localhost:8080/menas/api")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package za.co.absa.enceladus.dao.rest.auth

import org.mockito.stubbing.OngoingStubbing
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.mockito.scalatest.MockitoSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.BeforeAndAfter
import org.springframework.http.{HttpHeaders, ResponseEntity}
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.client.RestTemplate
import za.co.absa.enceladus.dao.UnauthorizedException
import za.co.absa.enceladus.dao.rest.{ApiCaller, ApiCallerStub, AuthClient}

abstract class AuthClientSuite() extends WordSpec
abstract class AuthClientSuite() extends AnyWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

package za.co.absa.enceladus.dao.rest.auth

import org.scalatest.WordSpec
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.auth.MenasPlainCredentials
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.fs.LocalFsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase

class MenasPlainCredentialsSuite extends WordSpec with SparkTestBase {
class MenasPlainCredentialsSuite extends AnyWordSpec with SparkTestBase {

"MenasPlainCredentials" should {
"be read from *.conf" in {
Expand All @@ -42,9 +42,7 @@ class MenasPlainCredentialsSuite extends WordSpec with SparkTestBase {
val homeDir = System.getProperty("user.home")
val expected = s"$homeDir/dir/file"

val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)

val actual = fsUtils.replaceHome("~/dir/file")
val actual = LocalFsUtils.replaceHome("~/dir/file")
assert(actual == expected)
}
}
Expand Down
6 changes: 6 additions & 0 deletions data-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
<version>${scalatest.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-funsuite_${scala.compat.version}</artifactId>
<version>${scalatest.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package za.co.absa.enceladus.model.conformanceRule

import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ConformanceRuleTest extends WordSpec with Matchers {
class ConformanceRuleTest extends AnyWordSpec with Matchers {

private val objectMapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

package za.co.absa.enceladus.model.menas.audit

import org.scalatest.FunSuite
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.model.conformanceRule.{DropConformanceRule, LiteralConformanceRule}
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule

class AuditableTest extends FunSuite {
class AuditableTest extends AnyFunSuite {
val obj1 = Dataset(name = "Test DS",
version = 0,
hdfsPath = "oldPath",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.RestDaoFactory
import za.co.absa.enceladus.examples.interpreter.rules.custom.UppercaseCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample1 {
Expand All @@ -37,6 +38,8 @@ object CustomRuleSample1 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val menasBaseUrls = List("http://localhost:8080/menas")
Expand Down Expand Up @@ -78,7 +81,7 @@ object CustomRuleSample1 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show(false)
//scalastyle:on magicnumber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.LPadCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample2 {
Expand All @@ -38,6 +39,8 @@ object CustomRuleSample2 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val conf = ConfigFactory.load()
Expand Down Expand Up @@ -81,7 +84,7 @@ object CustomRuleSample2 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show(false)
// scalastyle:on magic.number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample3 {
Expand All @@ -33,6 +34,7 @@ object CustomRuleSample3 {
.config("spark.sql.codegen.wholeStage", value = false)
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load()
Expand Down Expand Up @@ -79,7 +81,7 @@ object CustomRuleSample3 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample4 {
Expand Down Expand Up @@ -138,6 +139,7 @@ object CustomRuleSample4 {
def main(args: Array[String]): Unit = {
val cmd: CmdConfigLocal = getCmdLineArguments(args)
implicit val spark: SparkSession = buildSparkSession()
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
Expand Down Expand Up @@ -186,7 +188,7 @@ object CustomRuleSample4 {
.setCatalystWorkaroundEnabled(true)
.setControlFrameworkEnabled(false)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)
outputData.show()
saveToCsv(outputData, cmd.outPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.scalatest.funsuite.AnyFunSuite
import org.mockito.scalatest.MockitoSugar
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase


Expand All @@ -32,11 +33,12 @@ object TestOutputRow {
def apply(input: TestInputRow, doneUpper: String): TestOutputRow = TestOutputRow(input.id, input.mandatoryString, input.nullableString, doneUpper)
}

class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
class UppercaseCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val experimentalMR = true
val isCatalystWorkaroundEnabled = true
Expand Down Expand Up @@ -67,7 +69,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: Seq[TestOutputRow] = outputData.as[TestOutputRow].collect().toSeq
val expected: Seq[TestOutputRow] = (input zip Seq("HELLO WORLD", "ONE RING TO RULE THEM ALL", "ALREADY CAPS")).map(x => TestOutputRow(x._1, x._2))
Expand Down Expand Up @@ -101,7 +103,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: Seq[TestOutputRow] = outputData.as[TestOutputRow].collect().toSeq
val expected: Seq[TestOutputRow] = (input zip Seq("1", "4", "9")).map(x => TestOutputRow(x._1, x._2))
Expand Down Expand Up @@ -134,7 +136,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: List[TestOutputRow] = outputData.as[TestOutputRow].collect().toList
val expected: List[TestOutputRow] = (input zip Seq("WHAT A BEAUTIFUL PLACE", "ONE RING TO FIND THEM", null)).map(x => TestOutputRow(x._1, x._2)).toList
Expand Down
Loading