Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop FS API usage #44

Merged
merged 6 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package za.co.absa.atum

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import za.co.absa.atum.core.Atum.controlFrameworkState
import za.co.absa.atum.core.{Atum, Constants}
import za.co.absa.atum.persistence._
import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile}
import za.co.absa.atum.persistence.s3.{ControlMeasuresS3LoaderJsonFile, ControlMeasuresS3StorerJsonFile}
import za.co.absa.atum.persistence.s3.{ControlMeasuresSdkS3LoaderJsonFile, ControlMeasuresSdkS3StorerJsonFile}
import za.co.absa.atum.utils.InfoFile

import scala.language.implicitConversions

Expand Down Expand Up @@ -64,58 +65,59 @@ object AtumImplicits {
* Input and output info file names will be inferred automatically based on data source and destination paths
*
*/
def enableControlMeasuresTracking(): SparkSession = {
def enableControlMeasuresTracking()(implicit outputFs: FileSystem): SparkSession = {
enableControlMeasuresTracking(None, None)
}

/**
* Enable control measurements tracking on HDFS.
* Enable control measurements tracking on HDFS | S3 (using Hadoop FS API).
* Both input and output info file paths need to be provided
*
* Example info file path name: "data/input/wikidata.csv.info"
* Example info file path name: "data/input/wikidata.csv.info" or "s3://bucket1/folder1/wikidata.csv.info"
*
* @param sourceInfoFile Pathname to a json-formatted info file containing control measurements
* @param destinationInfoFile Pathname to save the control measurement results to
*/
def enableControlMeasuresTracking(sourceInfoFile: String = "",
destinationInfoFile: String = ""): SparkSession = {
val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration
implicit val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration

val loader = if (sourceInfoFile.isEmpty) None else Some(new DefaultControlInfoLoader(hadoopConfiguration, sourceInfoFile.toPath))
val storer = if (destinationInfoFile.isEmpty) None else Some(new DefaultControlInfoStorer(hadoopConfiguration, destinationInfoFile.toPath))
val loader = InfoFile(sourceInfoFile).toOptDefaultControlInfoLoader
val storer = InfoFile(destinationInfoFile).toOptDefaultControlInfoStorer

enableControlMeasuresTracking(loader, storer)
}

/**
* Enable S3-based control measurements tracking.
* Enable S3-based control measurements tracking via SDK S3
*
* @param sourceS3Location s3 location to load info files from in S3
* @param sourceS3Location s3 location to load info files from in S3
* @param destinationS3Config s3 location and kms settings to save the data to in S3
* @param credentialsProvider If you do not have a specific Credentials provider, use the default
* {@link software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider#create()}
* { @link software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider#create()}
* @return spark session with atum tracking enabled
*/
def enableControlMeasuresTrackingForS3(sourceS3Location: Option[S3Location],
destinationS3Config: Option[(S3Location, S3KmsSettings)])
(implicit credentialsProvider: AwsCredentialsProvider): SparkSession = {
def enableControlMeasuresTrackingForSdkS3(sourceS3Location: Option[SimpleS3LocationWithRegion],
destinationS3Config: Option[(SimpleS3LocationWithRegion, S3KmsSettings)])
(implicit credentialsProvider: AwsCredentialsProvider): SparkSession = {

val loader = sourceS3Location.map(new ControlMeasuresS3LoaderJsonFile(_))
val loader = sourceS3Location.map(ControlMeasuresSdkS3LoaderJsonFile(_))
val storer = destinationS3Config.map { case (destLoc, kms) =>
new ControlMeasuresS3StorerJsonFile(destLoc, kms)
ControlMeasuresSdkS3StorerJsonFile(destLoc, kms)
}

enableControlMeasuresTracking(loader, storer)
}

/**
* Enable control measurements tracking.
* This is a generic way to enable control measurements tracking enabling to provide a custom
* control measurements loader and storer objects
*
* @param loader An object responsible for loading data source control measurements
* @param storer An object responsible for storing the result control measurements
*/
* Enable control measurements tracking.
* This is a generic way to enable control measurements tracking enabling to provide a custom
* control measurements loader and storer objects
*
* @param loader An object responsible for loading data source control measurements
* @param storer An object responsible for storing the result control measurements
*
*/
def enableControlMeasuresTracking(loader: Option[ControlMeasuresLoader],
storer: Option[ControlMeasuresStorer]): SparkSession =
sparkSession.synchronized {
Expand Down Expand Up @@ -241,7 +243,7 @@ object AtumImplicits {
*
* @param name Name of the checkpoint
*/
def setCheckpoint(name: String, persistInDatabase: Boolean = true): Dataset[Row] = {
def setCheckpoint(name: String, persistInDatabase: Boolean = true)(implicit inputFs: FileSystem): Dataset[Row] = {
if (!(dataset.sparkSession.sessionState.conf contains Constants.InitFlagKey))
throw new IllegalStateException("Control framework tracking is not initialized.")
if (Atum.controlFrameworkState == null) {
Expand Down Expand Up @@ -278,7 +280,7 @@ object AtumImplicits {
* @param oldName A job step name
* @param newName An error description
*/
def registerColumnRename(oldName: String, newName: String): Dataset[Row] = {
def registerColumnRename(oldName: String, newName: String)(implicit inputFs: FileSystem): Dataset[Row] = {
if (!(dataset.sparkSession.sessionState.conf contains Constants.InitFlagKey))
throw new IllegalStateException("Control framework tracking is not initialized.")
controlFrameworkState.registerColumnRename(dataset, oldName, newName)
Expand All @@ -290,7 +292,7 @@ object AtumImplicits {
*
* @param columnName A column to be dropped from measurements
*/
def registerColumnDrop(columnName: String): Dataset[Row] = {
def registerColumnDrop(columnName: String)(implicit inputFs: FileSystem): Dataset[Row] = {
if (!(dataset.sparkSession.sessionState.conf contains Constants.InitFlagKey))
throw new IllegalStateException("Control framework tracking is not initialized.")
controlFrameworkState.registerColumnDrop(dataset, columnName)
Expand All @@ -302,7 +304,7 @@ object AtumImplicits {
* to ControlFrameworkKeys.InfoFileVersionKey Spark Session Key
*
*/
def loadControlInfoFile(): Dataset[Row] = {
def loadControlInfoFile(implicit inputFs: FileSystem): Dataset[Row] = {
Atum.controlFrameworkState.initializeControlInfo(dataset)
dataset
}
Expand All @@ -312,13 +314,13 @@ object AtumImplicits {
*
* @param outputPath A directory or a file name to save the info file to.
*/
def writeInfoFile(outputPath: String): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFileOnHdfs(outputPath.toPath)
def writeInfoFile(outputPath: String)(implicit outputFs: FileSystem): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFile(outputPath.toPath)
dataset
}

def writeInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFileOnS3(s3Location, s3KmsSettings)
def writeInfoFileOnS3(s3Location: SimpleS3LocationWithRegion, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFileOnSdkS3(s3Location, s3KmsSettings)
dataset
}

Expand Down
5 changes: 3 additions & 2 deletions atum/src/main/scala/za/co/absa/atum/core/Atum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.atum.core

import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.sql.util.QueryExecutionListener
Expand Down Expand Up @@ -204,12 +205,12 @@ object Atum {
controlFrameworkState.workflowName = workflowName
}

private[atum] def registerColumnRename(dataset: Dataset[Row], oldName: String, newName: String): Unit = {
private[atum] def registerColumnRename(dataset: Dataset[Row], oldName: String, newName: String)(implicit inputFs: FileSystem): Unit = {
preventNotInitialized()
controlFrameworkState.registerColumnRename(dataset, oldName, newName)
}

private[atum] def registerColumnDrop(dataset: Dataset[Row], columnName: String): Unit = {
private[atum] def registerColumnDrop(dataset: Dataset[Row], columnName: String)(implicit inputFs: FileSystem): Unit = {
preventNotInitialized()
controlFrameworkState.registerColumnDrop(dataset, columnName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import za.co.absa.atum.core.Atum.log
import za.co.absa.atum.core.ControlType.Count
import za.co.absa.atum.model.{RunError, RunState, _}
import za.co.absa.atum.persistence.hdfs.ControlMeasuresHdfsStorerJsonFile
import za.co.absa.atum.persistence.s3.ControlMeasuresS3StorerJsonFile
import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, S3KmsSettings, S3Location}
import za.co.absa.atum.persistence.s3.ControlMeasuresSdkS3StorerJsonFile
import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, S3KmsSettings, SimpleS3LocationWithRegion}
import za.co.absa.atum.plugins.EventListener
import za.co.absa.atum.utils.ExecutionPlanUtils.inferInputInfoFileName
import za.co.absa.atum.utils.ExecutionPlanUtils.inferInputInfoFilePath

import scala.util.control.NonFatal

Expand Down Expand Up @@ -100,23 +100,24 @@ class ControlFrameworkState(sparkSession: SparkSession) {
cacheStorageLevel = None
}

private[atum] def registerColumnRename(dataset: Dataset[Row], oldName: String, newName: String): Unit = {
private[atum] def registerColumnRename(dataset: Dataset[Row], oldName: String, newName: String)(implicit inputFs: FileSystem): Unit = {
initializeControlInfo(dataset)
if (processor == null) {
initializeProcessor(dataset.sparkSession)
}
processor.registerColumnRename(oldName, newName)
}

private[atum] def registerColumnDrop(dataset: Dataset[Row], columnName: String): Unit = {
private[atum] def registerColumnDrop(dataset: Dataset[Row], columnName: String)(implicit inputFs: FileSystem): Unit = {
initializeControlInfo(dataset)
if (processor == null) {
initializeProcessor(dataset.sparkSession)
}
processor.registerColumnDrop(columnName)
}

private[atum] def calculateCheckpoint(dataset: Dataset[Row], name: String, delayCheckpointPersistence: Boolean): Dataset[Row] = {
private[atum] def calculateCheckpoint(dataset: Dataset[Row], name: String, delayCheckpointPersistence: Boolean)
(implicit inputFs: FileSystem): Dataset[Row] = {
initializeControlInfo(dataset)
if (processor == null) {
initializeProcessor(dataset.sparkSession)
Expand Down Expand Up @@ -228,10 +229,10 @@ class ControlFrameworkState(sparkSession: SparkSession) {
}
}

private[atum] def initializeControlInfo(dataset: Dataset[Row]): Unit = {
private[atum] def initializeControlInfo(dataset: Dataset[Row])(implicit inputFs: FileSystem): Unit = {
if (!accumulator.isControlMeasuresLoaded) {
val s = inferInputInfoFileName(dataset, inputInfoFileName)
accumulator.loadControlMeasurements(new DefaultControlInfoLoader(sparkSession.sparkContext.hadoopConfiguration, s))
val infoFilePath = inferInputInfoFilePath(dataset, inputInfoFileName)
accumulator.loadControlMeasurements(new DefaultControlInfoLoader(infoFilePath))
}
}

Expand All @@ -248,21 +249,20 @@ class ControlFrameworkState(sparkSession: SparkSession) {
}
}

private[atum] def storeCurrentInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val storer = new ControlMeasuresS3StorerJsonFile(s3Location, s3KmsSettings)
private[atum] def storeCurrentInfoFileOnSdkS3(s3Location: SimpleS3LocationWithRegion, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val storer = ControlMeasuresSdkS3StorerJsonFile(s3Location, s3KmsSettings)
storer.store(accumulator.getControlMeasure)
Atum.log.info(s"Control measurements saved to ${s3Location.s3String()}")
Atum.log.info(s"Control measurements saved to ${s3Location.s3String}")
}

private[atum] def storeCurrentInfoFileOnHdfs(outputHDFSPathFileName: Path, hadoopConfiguration: Configuration = sparkSession.sparkContext.hadoopConfiguration): Unit = {
val fs = FileSystem.get(hadoopConfiguration)
val outputFilePath = if (fs.isDirectory(outputHDFSPathFileName)) {
new Path(outputHDFSPathFileName, outputInfoFileName)
private[atum] def storeCurrentInfoFile(outputInfoFilePath: Path)(implicit outputFs: FileSystem): Unit = {
val outputFilePath = if (outputFs.isDirectory(outputInfoFilePath)) {
new Path(outputInfoFilePath, outputInfoFileName)
} else {
outputHDFSPathFileName
outputInfoFilePath
}

val storer = new ControlMeasuresHdfsStorerJsonFile(hadoopConfiguration, outputFilePath)
val storer = ControlMeasuresHdfsStorerJsonFile(outputFilePath)
storer.store(accumulator.getControlMeasure)
Atum.log.info(s"Control measurements saved to ${outputFilePath.toUri.toString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package za.co.absa.atum.core

import java.io.{PrintWriter, StringWriter}

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import za.co.absa.atum.persistence.{S3ControlMeasuresStorer, S3KmsSettings}
import za.co.absa.atum.persistence.{HadoopFsControlMeasuresStorer, S3ControlMeasuresStorer, S3KmsSettings}
import za.co.absa.atum.utils.ExecutionPlanUtils._
import za.co.absa.atum.utils.S3Utils

Expand All @@ -35,15 +36,15 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut

cf.accumulator.getStorer match {
case Some(s3storer: S3ControlMeasuresStorer) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String()}")
writeInfoFileForQueryForS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

case Some(_) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to HDFS")
writeInfoFileForQuery(qe)
case Some(hadoopStorer: HadoopFsControlMeasuresStorer) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
writeInfoFileForQuery(qe)(hadoopStorer.outputFs)

case None =>
Atum.log.info("No storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
case _ =>
Atum.log.info("No usable storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
}

// Notify listeners
Expand All @@ -63,13 +64,13 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQuery(qe: QueryExecution): Unit = {
private def writeInfoFileForQuery(qe: QueryExecution)(implicit outputFs: FileSystem): Unit = {
val infoFilePath = inferOutputInfoFileName(qe, cf.outputInfoFileName)

// Write _INFO file to the output directory
infoFilePath.foreach(path => {
Atum.log.info(s"Inferred _INFO Path = ${path.toUri.toString}")
cf.storeCurrentInfoFileOnHdfs(path, qe.sparkSession.sparkContext.hadoopConfiguration)
cf.storeCurrentInfoFile(path)
})

// Write _INFO file to a registered storer
Expand All @@ -79,17 +80,17 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQueryForS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
private def writeInfoFileForQueryForSdkS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val infoFilePath = inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName)

// Write _INFO file to the output directory
infoFilePath.foreach(path => {

import S3Utils.StringS3LocationExt
val location = path.toS3Location(region)
val location = path.toS3LocationOrFail.withRegion(region)

Atum.log.debug(s"Inferred _INFO Location = $location")
cf.storeCurrentInfoFileOnS3(location, kmsSettings)
cf.storeCurrentInfoFileOnSdkS3(location, kmsSettings)
})

// Write _INFO file to a registered storer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.atum.persistence

import org.apache.hadoop.fs.FileSystem
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import za.co.absa.atum.model.ControlMeasure

Expand All @@ -24,9 +25,13 @@ trait ControlMeasuresStorer {
def getInfo: String
}

trait HadoopFsControlMeasuresStorer extends ControlMeasuresStorer {
def outputFs: FileSystem
}


trait S3ControlMeasuresStorer extends ControlMeasuresStorer {
def kmsSettings: S3KmsSettings
def outputLocation: S3Location

def outputLocation: SimpleS3LocationWithRegion
def credentialsProvider: AwsCredentialsProvider
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@

package za.co.absa.atum.persistence.hdfs

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresParser}
import za.co.absa.atum.utils.{ControlUtils, HdfsFileUtils}

/** A loader of control measurements from a JSON file stored in HDFS filesystem. */
class ControlMeasuresHdfsLoaderJsonFile(hadoopConfiguration: Configuration, path: Path) extends ControlMeasuresLoader {
/** A loader of control measurements from a JSON file stored in hadoop filesystem. */
case class ControlMeasuresHdfsLoaderJsonFile(path: Path)
(implicit inputFs: FileSystem) extends ControlMeasuresLoader {
override def load(): ControlMeasure = {

implicit val fs = FileSystem.get(hadoopConfiguration)
val controlInfoJson = HdfsFileUtils.readHdfsFileToString(path)

ControlUtils.preprocessControlMeasure(ControlMeasuresParser fromJson controlInfoJson)
}

override def getInfo: String = {
s"JSON deserializer from ${path.toUri}"
}
Expand Down
Loading