Skip to content

Commit

Permalink
#48 implicit saving test adding 1 (loader non-"", storer = "")
Browse files Browse the repository at this point in the history
  • Loading branch information
dk1844 committed Dec 3, 2020
1 parent 5c8057d commit 6872dfa
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package za.co.absa.atum

import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.model.{Checkpoint, Measurement}
import za.co.absa.atum.persistence.ControlMeasuresParser
import za.co.absa.atum.utils.SparkTestBase

class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Matchers {

private val log = LogManager.getLogger(this.getClass)


private val inputCsv = "data/input/wikidata.csv"
private def readSparkInputCsv(inputCsvPath: String): DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputCsvPath)

private def writeSparkData(df: DataFrame, outputPath: String): Unit =
df.write.mode(SaveMode.Overwrite)
.parquet(outputPath)


"_INFO" should "be written implicitly on spark.write" in {
val tempDir = LocalFsTestUtils.createLocalTemporaryDirectory("hdfsTestOutput") // todo beforeAll+cleanup afterAll?

import spark.implicits._
import za.co.absa.atum.AtumImplicits._

val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
implicit val fs = FileSystem.get(hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/wikidata.csv.info") // todo version with None, None, too?
.setControlMeasuresWorkflow("Job 1")

val df1 = readSparkInputCsv(inputCsv)
df1.setCheckpoint("Checkpoint0")
val filteredDf1 = df1.filter($"total_response_size" > 1000)
filteredDf1.setCheckpoint("Checkpoint1") // stateful, do not need return value

val outputPath = s"$tempDir/hdfsOutput/implicitTest1"
writeSparkData(filteredDf1, outputPath)

spark.disableControlMeasuresTracking()

log.info(s"Checking $outputPath/_INFO to contain expected values")

val infoContentJson = LocalFsTestUtils.readFileAsString(s"$outputPath/_INFO")
val infoControlMeasures = ControlMeasuresParser.fromJson(infoContentJson)

infoControlMeasures.checkpoints.map(_.name) shouldBe Seq("Source", "Raw", "Checkpoint0", "Checkpoint1")
val checkpoint0 = infoControlMeasures.checkpoints.collectFirst{ case c: Checkpoint if c.name == "Checkpoint0" => c }.get // todo generalize
checkpoint0.controls should contain (Measurement("recordCount", "count", "*", "5000"))

val checkpoint1 = infoControlMeasures.checkpoints.collectFirst{ case c: Checkpoint if c.name == "Checkpoint1" => c }.get
checkpoint1.controls should contain (Measurement("recordCount", "count", "*", "4964"))

LocalFsTestUtils.safeDeleteTestDir(tempDir)
}
}
43 changes: 43 additions & 0 deletions examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package za.co.absa.atum

import java.io.File
import java.nio.file.Files

import org.apache.commons.io.FileUtils
import org.apache.log4j.LogManager

import scala.io.Source
import scala.util.control.NonFatal

object LocalFsTestUtils {
private val log = LogManager.getLogger(this.getClass)

/**
* Creates a temporary directory in the local filesystem.
*
* @param prefix A prefix to use for the temporary directory.
* @return A path to a temporary directory.
*/
def createLocalTemporaryDirectory(prefix: String): String = {
val tmpPath = Files.createTempDirectory(prefix)
tmpPath.toAbsolutePath.toString
}

def safeDeleteTestDir(path: String): Unit = {
try {
FileUtils.deleteDirectory(new File(path))
} catch {
case NonFatal(e) => log.warn(s"Unable to delete a test directory $path")
}
}

def readFileAsString(filename: String, lineSeparator: String = "\n"): String = {
val sourceFile = Source.fromFile(filename)
try {
sourceFile.getLines().mkString(lineSeparator)
} finally {
sourceFile.close()
}
}

}

0 comments on commit 6872dfa

Please sign in to comment.