Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1894 HadoopFsPersistenceFactory - adding Spline S3 write support #1912

Merged
2 changes: 1 addition & 1 deletion spark-jobs/src/main/resources/spline.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# Set of properties for setting up persistence to MongoDB.
#
spline.persistence.factory=za.co.absa.spline.persistence.api.composition.ParallelCompositeFactory
spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory
spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.enceladus.spline.persistence.HadoopFsPersistenceFactory

spline.mongodb.url=mongodb://localhost:27017
spline.mongodb.name=spline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.spline.persistence

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileSystem, Path}
import org.slf4s.Logging
import za.co.absa.spline.common.ARM._
import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt
import za.co.absa.spline.model.DataLineage
import za.co.absa.spline.model.op.Write
import za.co.absa.spline.persistence.api.DataLineageWriter
import za.co.absa.spline.persistence.hdfs.serialization.JSONSerialization

import scala.concurrent.{ExecutionContext, Future, blocking}

/**
* The class represents persistence layer that persists on Hadoop FS. Based on
* [[za.co.absa.spline.persistence.hdfs.HdfsDataLineageWriter]].
*/
class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission)
extends DataLineageWriter with Logging {
/**
* The method stores a particular data lineage to the persistence layer.
*
* @param lineage A data lineage that will be stored
*/
override def store(lineage: DataLineage)(implicit ec: ExecutionContext): Future[Unit] = Future {
val pathOption = getPath(lineage)
import JSONSerialization._
for (path <- pathOption) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, interesting construction, a map replacement

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach replacement to be precise.

val content = lineage.toJson
persistToHadoopFs(content, path.toUri.toString)
}
}

/**
* Converts string full path to Hadoop FS and Path, e.g.
* `s3://mybucket1/path/to/file` -> S3 FS + `path/to/file`
* `/path/on/hdfs/to/file` -> local HDFS + `/path/on/hdfs/to/file`
*
* Note, that non-local HDFS paths are not supported in this method, e.g. hdfs://nameservice123:8020/path/on/hdfs/too.
*
* @param pathString path to convert to FS and relative path
* @return FS + relative path
**/
def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
pathString.toSimpleS3Location match {
case Some(s3Location) =>
val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>

val fs = FileSystem.get(s3Uri, hadoopConfiguration)
(fs, s3Path)

case None => // local hdfs location
val fs = FileSystem.get(hadoopConfiguration)
(fs, new Path(pathString))
}
}

private def persistToHadoopFs(content: String, hadoopPath: String): Unit = blocking {
val (fs, path) = pathStringToFsWithPath(hadoopPath)
log debug s"Writing lineage to $path"

using(fs.create(
path,
filePermissions,
true,
hadoopConfiguration.getInt("io.file.buffer.size", 4096), // scalastyle:ignore magic.number
fs.getDefaultReplication(path),
fs.getDefaultBlockSize(path),
null)) { // scalastyle:ignore null
_.write(content.getBytes)
}
}

private def getPath(lineage: DataLineage): Option[Path] =
lineage.rootOperation match {
case writeOp: Write => Some(new Path(writeOp.path, fileName))
case _ => None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.spline.persistence

import org.apache.commons.configuration.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.SparkContext
import za.co.absa.spline.persistence.api._

/**
* The class represents persistence layer factory for Hadoop FS (inluding AWS S3). Based on
* * [[za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory]].
*/
object HadoopFsPersistenceFactory {
private val fileNameKey = "spline.hdfs.file.name"
Copy link

@wajda wajda Sep 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was copied from the old Spline, and I'm not sure what naming convention for constants are set in Enceladus, but I remember there was a conversation on CQC some time ago - constant names should generally be written upper camel case - https://docs.scala-lang.org/style/naming-conventions.html#constants-values-and-variables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we adhere to capital letters too. But not very pedantic in code reviews about it. 😉

private val filePermissionsKey = "spline.hdfs.file.permissions"
}

/**
* The class represents a factory creating Hadoop FS persistence layers for all main data lineage entities.
*
* @param configuration A source of settings
*/
class HadoopFsPersistenceFactory(configuration: Configuration) extends PersistenceFactory(configuration) {

import HadoopFsPersistenceFactory._

private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration
private val fileName = configuration.getString(fileNameKey, "_LINEAGE")
private val defaultFilePermissions = FsPermission.getFileDefault
.applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf))
private val filePermissions = new FsPermission(configuration.getString(filePermissionsKey, defaultFilePermissions.toShort.toString))

log.info(s"Lineage destination path: $fileName")

/**
* The method creates a persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity.
*
* @return A persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity
*/
override def createDataLineageWriter: DataLineageWriter = new HadoopFsDataLineageWriter(hadoopConfiguration, fileName, filePermissions)

/**
* The method creates a reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity.
*
* @return An optional reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity
*/
override def createDataLineageReader: Option[DataLineageReader] = None
}