Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1894 HadoopFsPersistenceFactory - adding Spline S3 write support #1912

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, this empty line makes Scalastyle prevent my build.

* http://www.apache.org/licenses/LICENSE-2.0
dk1844 marked this conversation as resolved.
Show resolved Hide resolved
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.spline.persistence

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileSystem, Path}
import org.slf4s.Logging
import za.co.absa.spline.common.ARM._
import za.co.absa.atum.location.S3Location.StringS3LocationExt
import za.co.absa.spline.model.DataLineage
import za.co.absa.spline.model.op.Write
import za.co.absa.spline.persistence.api.DataLineageWriter
import za.co.absa.spline.persistence.hdfs.serialization.JSONSerialization

import scala.concurrent.{ExecutionContext, Future, blocking}

/**
* The class represents persistence layer that persists the [[za.co.absa.spline.model.DataLineage DataLineage]] entity to a file on HDFS.
*/
class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission) extends DataLineageWriter with Logging {
benedeki marked this conversation as resolved.
Show resolved Hide resolved
/**
* The method stores a particular data lineage to the persistence layer.
*
* @param lineage A data lineage that will be stored
*/
override def store(lineage: DataLineage)(implicit ec: ExecutionContext): Future[Unit] = Future {
val pathOption = getPath(lineage)
import JSONSerialization._
for (path <- pathOption) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, interesting construction, a map replacement

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach replacement to be precise.

val content = lineage.toJson
persistToHadoopFs(content, path.toUri.toString)
}
}

/**
* Converts string full path to Hadoop FS and Path, e.g.
* `s3://mybucket1/path/to/file` -> S3 FS + `path/to/file`
* `/path/on/hdfs/to/file` -> local HDFS + `/path/on/hdfs/to/file`
*
* Note, that non-local HDFS paths are not supported in this method, e.g. hdfs://nameservice123:8020/path/on/hdfs/too.
*
* @param pathString path to convert to FS and relative path
* @return FS + relative path
**/
def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
pathString.toS3Location match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implicit function comes from Atum. That seems like a weird dependency. Wasn't it part of commons too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point of develop (using Atum 3.3.0, it is still part of Atum). Later, in Atum 3.5+, this is moved to commons.

Since this PR does not update Atum, the import change will have to be resolved in the future.

case Some(s3Location) =>
val s3Uri = new URI(s3Location.s3String) // s3://<bucket>
val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>

val fs = FileSystem.get(s3Uri, hadoopConfiguration)
(fs, s3Path)

case None => // local hdfs location
val fs = FileSystem.get(hadoopConfiguration)
(fs, new Path(pathString))
}
}

private def persistToHadoopFs(content: String, hadoopPath: String): Unit = blocking {
val (fs, path) = pathStringToFsWithPath(hadoopPath)
log debug s"Writing lineage to $path"

using(fs.create(
path,
filePermissions,
true,
hadoopConfiguration.getInt("io.file.buffer.size", 4096),
benedeki marked this conversation as resolved.
Show resolved Hide resolved
fs.getDefaultReplication(path),
fs.getDefaultBlockSize(path),
null)) {
benedeki marked this conversation as resolved.
Show resolved Hide resolved
_.write(content.getBytes)
}
}

private def getPath(lineage: DataLineage): Option[Path] =
lineage.rootOperation match {
case dn: Write => Some(new Path(dn.path, fileName))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the dn have a more explanatory name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, adjusted.

case _ => None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.spline.persistence

import org.apache.commons.configuration.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.SparkContext
import za.co.absa.spline.persistence.api._

/**
* The object contains static information about settings needed for initialization of the HdfsPersistenceWriterFactory class.
*/
object HadoopFsPersistenceFactory {
val fileNameKey = "spline.hdfs.file.name"
val filePermissionsKey = "spline.hdfs.file.permissions"
benedeki marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* The class represents a factory creating HDFS persistence layers for all main data lineage entities.
*
* @param configuration A source of settings
*/
class HadoopFsPersistenceFactory(configuration: Configuration) extends PersistenceFactory(configuration) {

import HadoopFsPersistenceFactory._

private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration
private val fileName = configuration.getString(fileNameKey, "_LINEAGE")
private val defaultFilePermissions = FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf))
benedeki marked this conversation as resolved.
Show resolved Hide resolved
private val filePermissions = new FsPermission(configuration.getString(filePermissionsKey, defaultFilePermissions.toShort.toString))

log.info(s"Lineage destination path: $fileName")

/**
* The method creates a persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity.
*
* @return A persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity
*/
override def createDataLineageWriter: DataLineageWriter = new HadoopFsDataLineageWriter(hadoopConfiguration, fileName, filePermissions)

/**
* The method creates a reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity.
*
* @return An optional reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity
*/
override def createDataLineageReader: Option[DataLineageReader] = None
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.spline.persistence.serialization

import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write

object JSONSerialization {
benedeki marked this conversation as resolved.
Show resolved Hide resolved

implicit val formats: Formats = Serialization.formats(NoTypeHints) ++ org.json4s.ext.JavaTypesSerializers.all

implicit class EntityToJson[T <: AnyRef](entity: T) {
def toJson: String = write(entity)
}

}