Skip to content

Commit

Permalink
#48 explicit saving test adding (loader non-"", storer = defined)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk1844 committed Dec 14, 2020
1 parent 6872dfa commit c140413
Showing 1 changed file with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package za.co.absa.atum
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.model.{Checkpoint, Measurement}
import za.co.absa.atum.persistence.ControlMeasuresParser
import za.co.absa.atum.utils.SparkTestBase

class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Matchers {
class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Matchers with BeforeAndAfterAll {

private val log = LogManager.getLogger(this.getClass)
val tempDir: String = LocalFsTestUtils.createLocalTemporaryDirectory("hdfsTestOutput")

override def afterAll = {
LocalFsTestUtils.safeDeleteTestDir(tempDir)
}

private val inputCsv = "data/input/wikidata.csv"
private def readSparkInputCsv(inputCsvPath: String): DataFrame = spark.read
Expand All @@ -24,42 +29,48 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
df.write.mode(SaveMode.Overwrite)
.parquet(outputPath)

{
val outputPath = s"$tempDir/outputCheck1"
// implicit variant only writes to derived outputPath, explicit writes to both implicit derived path and the explicit one, too.
Seq(
("implicit output _INFO path only", "", Seq(s"$outputPath/_INFO")),
("implicit & explicit output _INFO path", s"$outputPath/extra/_INFO2", Seq(s"$outputPath/_INFO", s"$outputPath/extra/_INFO2"))
).foreach { case (testCaseName, destinationInfoFilePath, expectedPaths) =>

"_INFO" should "be written implicitly on spark.write" in {
val tempDir = LocalFsTestUtils.createLocalTemporaryDirectory("hdfsTestOutput") // todo beforeAll+cleanup afterAll?

import spark.implicits._
import za.co.absa.atum.AtumImplicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)
"_INFO" should s"be written on spark.write ($testCaseName)" in {
import spark.implicits._
import za.co.absa.atum.AtumImplicits._

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info") // todo version with None, None, too?
.setControlMeasuresWorkflow("Job 1")
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)

val df1 = readSparkInputCsv(inputCsv)
df1.setCheckpoint("Checkpoint0")
val filteredDf1 = df1.filter($"total_response_size" > 1000)
filteredDf1.setCheckpoint("Checkpoint1") // stateful, do not need return value
// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info", destinationInfoFile = destinationInfoFilePath)
.setControlMeasuresWorkflow("Job 1")

val outputPath = s"$tempDir/hdfsOutput/implicitTest1"
writeSparkData(filteredDf1, outputPath)
val df1 = readSparkInputCsv(inputCsv)
df1.setCheckpoint("Checkpoint0")
val filteredDf1 = df1.filter($"total_response_size" > 1000)
filteredDf1.setCheckpoint("Checkpoint1") // stateful, do not need return value
writeSparkData(filteredDf1, outputPath) // implicit output _INFO file path is derived from this path passed to spark.write

spark.disableControlMeasuresTracking()
spark.disableControlMeasuresTracking()

log.info(s"Checking $outputPath/_INFO to contain expected values")
expectedPaths.foreach { expectedPath =>
log.info(s"Checking $expectedPath to contain expected values")

val infoContentJson = LocalFsTestUtils.readFileAsString(s"$outputPath/_INFO")
val infoControlMeasures = ControlMeasuresParser.fromJson(infoContentJson)
val infoContentJson = LocalFsTestUtils.readFileAsString(expectedPath)
val infoControlMeasures = ControlMeasuresParser.fromJson(infoContentJson)

infoControlMeasures.checkpoints.map(_.name) shouldBe Seq("Source", "Raw", "Checkpoint0", "Checkpoint1")
val checkpoint0 = infoControlMeasures.checkpoints.collectFirst{ case c: Checkpoint if c.name == "Checkpoint0" => c }.get // todo generalize
checkpoint0.controls should contain (Measurement("recordCount", "count", "*", "5000"))
infoControlMeasures.checkpoints.map(_.name) shouldBe Seq("Source", "Raw", "Checkpoint0", "Checkpoint1")
val checkpoint0 = infoControlMeasures.checkpoints.collectFirst { case c: Checkpoint if c.name == "Checkpoint0" => c }.get
checkpoint0.controls should contain(Measurement("recordCount", "count", "*", "5000"))

val checkpoint1 = infoControlMeasures.checkpoints.collectFirst{ case c: Checkpoint if c.name == "Checkpoint1" => c }.get
checkpoint1.controls should contain (Measurement("recordCount", "count", "*", "4964"))

LocalFsTestUtils.safeDeleteTestDir(tempDir)
val checkpoint1 = infoControlMeasures.checkpoints.collectFirst { case c: Checkpoint if c.name == "Checkpoint1" => c }.get
checkpoint1.controls should contain(Measurement("recordCount", "count", "*", "4964"))
}
}
}
}

}

0 comments on commit c140413

Please sign in to comment.