Skip to content

Commit

Permalink
#48 no storer write fix - hadoopfs default storer is used
Browse files Browse the repository at this point in the history
 - hdfs test enabled for build, while s3 ignored
 - readme update
  • Loading branch information
dk1844 committed Dec 3, 2020
1 parent c7aba25 commit 601c217
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ object ExampleSparkJob {
import spark.implicits._

// implicit FS is needed for enableControlMeasuresTracking, setCheckpoint calls, e.g. standard HDFS here:
implicit val localHdfs = FileSystem.get(new Configuration)
implicit val localHdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

// Initializing library to hook up to Apache Spark
spark.enableControlMeasuresTracking(sourceInfoFile = "data/input/_INFO")
Expand Down Expand Up @@ -188,8 +188,28 @@ in 'data/input/_INFO'. Two checkpoints are created. Any business logic can be in
and saving it to Parquet format.

### Storing Measurements in AWS S3
Starting with version 3.0.0, persistence support for AWS S3 has been added.
AWS S3 can be both used for loading the measurement data from as well as saving the measurements back to.

#### AWS S3 via Hadoop FS API
Since version 3.1.0, persistence support for AWS S3 via Hadoop FS API is available. The usage is the same as with
regular HDFS with the exception of providing a different file system, e.g.:
```scala
import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Example Spark Job")
.getOrCreate()

val s3Uri = new URI("s3://my-awesome-bucket")
implicit val fs = FileSystem.get(s3Uri, spark.sparkContext.hadoopConfiguration)

```
The rest of the usage is the same in the example listed above.

#### AWS S3 via AWS SDK for S3
Starting with version 3.0.0, there is also persistence support for AWS S3 via AWS SDK S3.

The following example demonstrates the setup:
```scala
Expand Down Expand Up @@ -230,7 +250,7 @@ object S3Example {
}

```
The rest of the processing logic and programatic approach to the library remains unchanged.
The rest of the processing logic and programmatic approach to the library remains unchanged.


## Atum library routines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut
writeInfoFileForQuery(qe)(hadoopStorer.outputFs)

case _ =>
Atum.log.info("No usable storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
Atum.log.info("No storer is set, using default HadoopFs-based bound with DF-save to an inferred _INFO file path.")
val defaultFs = FileSystem.get(qe.sparkSession.sparkContext.hadoopConfiguration)
writeInfoFileForQuery(qe)(defaultFs)
}

// Notify listeners
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<artifactId>scalatest-maven-plugin</artifactId>
<version>${scalatest.maven.version}</version>
<configuration>
<skipTests>true</skipTests>
<skipTests>false</skipTests>
</configuration>
</plugin>
<!-- Uber jar generation -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package za.co.absa.atum.examples

import org.scalatest.Ignore
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.atum.utils._

class SampleMeasurementsS3RunnerSpec extends AnyFunSuite
@Ignore
class SampleMeasurementsS3RunnerExampleSpec extends AnyFunSuite
with SparkJobRunnerMethods
with SparkLocalMaster {

Expand Down

0 comments on commit 601c217

Please sign in to comment.