Skip to content

Commit

Permalink
S3Location moved to absa commons (#71)
Browse files Browse the repository at this point in the history
* S3Location moved to absa commons
* absa commons 0.0.27 final version used
  • Loading branch information
dk1844 authored Apr 7, 2021
1 parent 2e3cde9 commit 62eea2b
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 138 deletions.
6 changes: 6 additions & 0 deletions atum-s3-sdk-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class ControlFrameworkStateSdkS3(sparkSession: SparkSession) extends ControlFram
val storer = ControlMeasuresSdkS3StorerJsonFile(s3Location, s3KmsSettings)

storer.store(accumulator.getControlMeasure)
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.s3String}")
AtumSdkS3.log.info(s"Control measurements saved to ${s3Location.asSimpleS3LocationString}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import software.amazon.awssdk.regions.Region
import za.co.absa.atum.persistence.S3ControlMeasuresStorer
import za.co.absa.atum.persistence.s3.S3KmsSettings
import za.co.absa.atum.utils.ExecutionPlanUtils
import za.co.absa.commons.s3.SimpleS3Location

/**
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
Expand All @@ -33,7 +34,7 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
// adding s3 processing
cf.accumulator.getStorer match {
case Some(s3storer: S3ControlMeasuresStorer) =>
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String}")
AtumSdkS3.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.asSimpleS3LocationString}")
writeInfoFileForQueryForSdkS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

// Notify listeners
Expand All @@ -48,17 +49,17 @@ class SparkQueryExecutionListenerSdkS3(cf: ControlFrameworkStateSdkS3) extends S
}
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQueryForSdkS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val infoFilePath = ExecutionPlanUtils.inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName)

// Write _INFO file to the output directory
infoFilePath.foreach(path => {

import za.co.absa.atum.persistence.s3.S3LocationRegionImplicits.SimpleS3LocationRegionExt
import za.co.absa.atum.location.S3Location.StringS3LocationExt

val location = path.toS3LocationOrFail.withRegion(region)
val location = SimpleS3Location(path) // would throw IAE on apply (parse error)
.withRegion(region)

AtumSdkS3.log.debug(s"Inferred _INFO Location = $location")
cf.storeCurrentInfoFileOnSdkS3(location, kmsSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class ControlMeasuresSdkS3LoaderJsonFile(inputLocation: SimpleS3LocationWit
}

override def getInfo: String = {
s"JSON deserializer from ${inputLocation.s3String}"
s"JSON deserializer from ${inputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(inputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class ControlMeasuresSdkS3StorerJsonFile(outputLocation: SimpleS3LocationWi
}

override def getInfo: String = {
s"JSON serializer for Storer to ${outputLocation.s3String}"
s"JSON serializer for Storer to ${outputLocation.asSimpleS3LocationString}"
}

private[s3] def getS3Client: S3Client = SdkS3ClientUtils.getS3Client(outputLocation.region, credentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package za.co.absa.atum.persistence.s3

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.ServerSideEncryption
import za.co.absa.atum.location.S3Location
import za.co.absa.commons.s3.{S3Location, SimpleS3Location}

trait Regional {
def region: Region
}

case class SimpleS3LocationWithRegion(protocol: String, bucketName: String, path: String, region: Region) extends S3Location with Regional {
def withRegion(region: Region): SimpleS3LocationWithRegion = this.copy(region = region)

override def asSimpleS3LocationString: String = SimpleS3Location(protocol, bucketName, path).asSimpleS3LocationString
}

case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS)
Expand Down
6 changes: 6 additions & 0 deletions atum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
</dependency>

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions atum/src/main/scala/za/co/absa/atum/utils/InfoFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import za.co.absa.atum.AtumImplicits.{DefaultControlInfoLoader, DefaultControlInfoStorer, StringPathExt}
import za.co.absa.atum.location.S3Location.StringS3LocationExt
import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt

object InfoFile {
/**
Expand All @@ -37,11 +37,11 @@ object InfoFile {
def convertFullPathToFsAndRelativePath(fullPath: String)(implicit hadoopConfiguration: Configuration): (FileSystem, Path) = {
val sanitizedFullPath = fullPath.replaceAll("[\\*\\?]", "")

sanitizedFullPath.toS3Location match {
sanitizedFullPath.toSimpleS3Location match {

case Some(s3Location) =>
// this is S3 over hadoop FS API, not SDK S3 approach
val s3Uri = new URI(s3Location.s3String) // s3://<bucket>
val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>

val fs = FileSystem.get(s3Uri, hadoopConfiguration)
Expand Down
8 changes: 8 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@
<version>${project.version}</version>
</dependency>

<!-- Commons -->
<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.binary.version}</artifactId>
<version>${commons.version}</version>
<scope>test</scope>
</dependency>

<!-- Scalatest is added as a provided scope so it won't be included to the uber jar -->
<dependency>
<groupId>org.scalatest</groupId>
Expand Down
63 changes: 0 additions & 63 deletions model/src/main/scala/za/co/absa/atum/location/S3Location.scala

This file was deleted.

64 changes: 0 additions & 64 deletions model/src/test/scala/za/co/absa/atum/location/S3LocationSpec.scala

This file was deleted.

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
<specs.version>2.5</specs.version>
<aws.java.sdk.version>2.13.65</aws.java.sdk.version>
<mockito.scala.version>1.15.0</mockito.scala.version>
<commons.version>0.0.27</commons.version>

<!-- Spark versions -->
<spark-24.version>2.4.6</spark-24.version>
Expand Down

0 comments on commit 62eea2b

Please sign in to comment.