diff --git a/spark-jobs/src/main/resources/spline.properties.template b/spark-jobs/src/main/resources/spline.properties.template index a18c6a1b6..91e65e128 100644 --- a/spark-jobs/src/main/resources/spline.properties.template +++ b/spark-jobs/src/main/resources/spline.properties.template @@ -20,7 +20,7 @@ # Set of properties for setting up persistence to MongoDB. # spline.persistence.factory=za.co.absa.spline.persistence.api.composition.ParallelCompositeFactory -spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory +spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.enceladus.spline.persistence.HadoopFsPersistenceFactory spline.mongodb.url=mongodb://localhost:27017 spline.mongodb.name=spline diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala new file mode 100644 index 000000000..ead0ee4e6 --- /dev/null +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -0,0 +1,99 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.spline.persistence + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.fs.{FileSystem, Path} +import org.slf4s.Logging +import za.co.absa.spline.common.ARM._ +import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt +import za.co.absa.spline.model.DataLineage +import za.co.absa.spline.model.op.Write +import za.co.absa.spline.persistence.api.DataLineageWriter +import za.co.absa.spline.persistence.hdfs.serialization.JSONSerialization + +import scala.concurrent.{ExecutionContext, Future, blocking} + +/** + * The class represents persistence layer that persists on Hadoop FS. Based on + * [[za.co.absa.spline.persistence.hdfs.HdfsDataLineageWriter]]. + */ +class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission) + extends DataLineageWriter with Logging { + /** + * The method stores a particular data lineage to the persistence layer. + * + * @param lineage A data lineage that will be stored + */ + override def store(lineage: DataLineage)(implicit ec: ExecutionContext): Future[Unit] = Future { + val pathOption = getPath(lineage) + import JSONSerialization._ + for (path <- pathOption) { + val content = lineage.toJson + persistToHadoopFs(content, path.toUri.toString) + } + } + + /** + * Converts string full path to Hadoop FS and Path, e.g. + * `s3://mybucket1/path/to/file` -> S3 FS + `path/to/file` + * `/path/on/hdfs/to/file` -> local HDFS + `/path/on/hdfs/to/file` + * + * Note, that non-local HDFS paths are not supported in this method, e.g. hdfs://nameservice123:8020/path/on/hdfs/too. + * + * @param pathString path to convert to FS and relative path + * @return FS + relative path + **/ + def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = { + pathString.toSimpleS3Location match { + case Some(s3Location) => + val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3:// + val s3Path = new Path(s"/${s3Location.path}") // / + + val fs = FileSystem.get(s3Uri, hadoopConfiguration) + (fs, s3Path) + + case None => // local hdfs location + val fs = FileSystem.get(hadoopConfiguration) + (fs, new Path(pathString)) + } + } + + private def persistToHadoopFs(content: String, hadoopPath: String): Unit = blocking { + val (fs, path) = pathStringToFsWithPath(hadoopPath) + log debug s"Writing lineage to $path" + + using(fs.create( + path, + filePermissions, + true, + hadoopConfiguration.getInt("io.file.buffer.size", 4096), // scalastyle:ignore magic.number + fs.getDefaultReplication(path), + fs.getDefaultBlockSize(path), + null)) { // scalastyle:ignore null + _.write(content.getBytes) + } + } + + private def getPath(lineage: DataLineage): Option[Path] = + lineage.rootOperation match { + case writeOp: Write => Some(new Path(writeOp.path, fileName)) + case _ => None + } +} diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala new file mode 100644 index 000000000..d20e37540 --- /dev/null +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.spline.persistence + +import org.apache.commons.configuration.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.SparkContext +import za.co.absa.spline.persistence.api._ + +/** + * The class represents persistence layer factory for Hadoop FS (inluding AWS S3). Based on + * * [[za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory]]. + */ +object HadoopFsPersistenceFactory { + private val fileNameKey = "spline.hdfs.file.name" + private val filePermissionsKey = "spline.hdfs.file.permissions" +} + +/** + * The class represents a factory creating Hadoop FS persistence layers for all main data lineage entities. + * + * @param configuration A source of settings + */ +class HadoopFsPersistenceFactory(configuration: Configuration) extends PersistenceFactory(configuration) { + + import HadoopFsPersistenceFactory._ + + private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration + private val fileName = configuration.getString(fileNameKey, "_LINEAGE") + private val defaultFilePermissions = FsPermission.getFileDefault + .applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf)) + private val filePermissions = new FsPermission(configuration.getString(filePermissionsKey, defaultFilePermissions.toShort.toString)) + + log.info(s"Lineage destination path: $fileName") + + /** + * The method creates a persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity. + * + * @return A persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity + */ + override def createDataLineageWriter: DataLineageWriter = new HadoopFsDataLineageWriter(hadoopConfiguration, fileName, filePermissions) + + /** + * The method creates a reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity. + * + * @return An optional reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity + */ + override def createDataLineageReader: Option[DataLineageReader] = None +}