From 50957f8526b59d056fef0233df4f18c44b79ad8a Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Thu, 9 Sep 2021 15:29:12 +0200 Subject: [PATCH 1/6] #1894 Spline S3 support via custom persistence factory `HadoopFsPersistenceFactory`. --- .../HadoopFsDataLineageWriter.scala | 98 +++++++++++++++++++ .../HadoopFsPersistenceFactory.scala | 61 ++++++++++++ .../serialization/JSONSerialization.scala | 31 ++++++ 3 files changed, 190 insertions(+) create mode 100644 spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala create mode 100644 spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala create mode 100644 spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala new file mode 100644 index 000000000..8b2f31251 --- /dev/null +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2017 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.spline.persistence + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.fs.{FileSystem, Path} +import org.slf4s.Logging +import za.co.absa.spline.common.ARM._ +import za.co.absa.atum.location.S3Location.StringS3LocationExt +import za.co.absa.spline.model.DataLineage +import za.co.absa.spline.model.op.Write +import za.co.absa.spline.persistence.api.DataLineageWriter +import za.co.absa.spline.persistence.hdfs.serialization.JSONSerialization + +import scala.concurrent.{ExecutionContext, Future, blocking} + +/** + * The class represents persistence layer that persists the [[za.co.absa.spline.model.DataLineage DataLineage]] entity to a file on HDFS. + */ +class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission) extends DataLineageWriter with Logging { + /** + * The method stores a particular data lineage to the persistence layer. + * + * @param lineage A data lineage that will be stored + */ + override def store(lineage: DataLineage)(implicit ec: ExecutionContext): Future[Unit] = Future { + val pathOption = getPath(lineage) + import JSONSerialization._ + for (path <- pathOption) { + val content = lineage.toJson + persistToHadoopFs(content, path.toUri.toString) + } + } + + /** + * Converts string full path to Hadoop FS and Path, e.g. + * `s3://mybucket1/path/to/file` -> S3 FS + `path/to/file` + * `/path/on/hdfs/to/file` -> local HDFS + `/path/on/hdfs/to/file` + * + * Note, that non-local HDFS paths are not supported in this method, e.g. hdfs://nameservice123:8020/path/on/hdfs/too. + * + * @param pathString path to convert to FS and relative path + * @return FS + relative path + **/ + def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = { + pathString.toS3Location match { + case Some(s3Location) => + val s3Uri = new URI(s3Location.s3String) // s3:// + val s3Path = new Path(s"/${s3Location.path}") // / + + val fs = FileSystem.get(s3Uri, hadoopConfiguration) + (fs, s3Path) + + case None => // local hdfs location + val fs = FileSystem.get(hadoopConfiguration) + (fs, new Path(pathString)) + } + } + + private def persistToHadoopFs(content: String, hadoopPath: String): Unit = blocking { + val (fs, path) = pathStringToFsWithPath(hadoopPath) + log debug s"Writing lineage to $path" + + using(fs.create( + path, + filePermissions, + true, + hadoopConfiguration.getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(path), + fs.getDefaultBlockSize(path), + null)) { + _.write(content.getBytes) + } + } + + private def getPath(lineage: DataLineage): Option[Path] = + lineage.rootOperation match { + case dn: Write => Some(new Path(dn.path, fileName)) + case _ => None + } +} diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala new file mode 100644 index 000000000..a934eb6a4 --- /dev/null +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.spline.persistence + +import org.apache.commons.configuration.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.SparkContext +import za.co.absa.spline.persistence.api._ + +/** + * The object contains static information about settings needed for initialization of the HdfsPersistenceWriterFactory class. + */ +object HadoopFsPersistenceFactory { + val fileNameKey = "spline.hdfs.file.name" + val filePermissionsKey = "spline.hdfs.file.permissions" +} + +/** + * The class represents a factory creating HDFS persistence layers for all main data lineage entities. + * + * @param configuration A source of settings + */ +class HadoopFsPersistenceFactory(configuration: Configuration) extends PersistenceFactory(configuration) { + + import HadoopFsPersistenceFactory._ + + private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration + private val fileName = configuration.getString(fileNameKey, "_LINEAGE") + private val defaultFilePermissions = FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf)) + private val filePermissions = new FsPermission(configuration.getString(filePermissionsKey, defaultFilePermissions.toShort.toString)) + + log.info(s"Lineage destination path: $fileName") + + /** + * The method creates a persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity. + * + * @return A persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity + */ + override def createDataLineageWriter: DataLineageWriter = new HadoopFsDataLineageWriter(hadoopConfiguration, fileName, filePermissions) + + /** + * The method creates a reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity. + * + * @return An optional reader from the persistence layer for the [[za.co.absa.spline.model.DataLineage DataLineage]] entity + */ + override def createDataLineageReader: Option[DataLineageReader] = None +} diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala new file mode 100644 index 000000000..f23607a42 --- /dev/null +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2017 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.spline.persistence.serialization + +import org.json4s._ +import org.json4s.jackson.Serialization +import org.json4s.jackson.Serialization.write + +object JSONSerialization { + + implicit val formats: Formats = Serialization.formats(NoTypeHints) ++ org.json4s.ext.JavaTypesSerializers.all + + implicit class EntityToJson[T <: AnyRef](entity: T) { + def toJson: String = write(entity) + } + +} From 87fa4f62fb8b9a026ff7709c9f2dcd75fbc32977 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Mon, 13 Sep 2021 12:16:50 +0200 Subject: [PATCH 2/6] #1894 licence fix --- .../spline/persistence/HadoopFsDataLineageWriter.scala | 2 +- .../spline/persistence/HadoopFsPersistenceFactory.scala | 2 +- .../spline/persistence/serialization/JSONSerialization.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala index 8b2f31251..e731d63b7 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright 2017 ABSA Group Limited + * Copyright 2018 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala index a934eb6a4..d9f0be422 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 ABSA Group Limited + * Copyright 2018 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala index f23607a42..ff9a878fc 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala @@ -1,5 +1,5 @@ /* - * Copyright 2017 ABSA Group Limited + * Copyright 2018 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 717b928e8eff79f8038de7afde1818665b25cec2 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Tue, 14 Sep 2021 09:39:06 +0200 Subject: [PATCH 3/6] #1894 PR review updates (code style, visibility, excessive JSONSerialization is linked from Spline, ... ) --- .../HadoopFsDataLineageWriter.scala | 12 ++++--- .../HadoopFsPersistenceFactory.scala | 12 ++++--- .../serialization/JSONSerialization.scala | 31 ------------------- 3 files changed, 14 insertions(+), 41 deletions(-) delete mode 100644 spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala index e731d63b7..2d04fe63f 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -32,9 +32,11 @@ import za.co.absa.spline.persistence.hdfs.serialization.JSONSerialization import scala.concurrent.{ExecutionContext, Future, blocking} /** - * The class represents persistence layer that persists the [[za.co.absa.spline.model.DataLineage DataLineage]] entity to a file on HDFS. + * The class represents persistence layer that persists on Hadoop FS. Based on + * [[za.co.absa.spline.persistence.hdfs.HdfsDataLineageWriter]]. */ -class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission) extends DataLineageWriter with Logging { +class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: String, filePermissions: FsPermission) + extends DataLineageWriter with Logging { /** * The method stores a particular data lineage to the persistence layer. * @@ -82,17 +84,17 @@ class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: St path, filePermissions, true, - hadoopConfiguration.getInt("io.file.buffer.size", 4096), + hadoopConfiguration.getInt("io.file.buffer.size", 4096), // scalastyle:ignore magic.number fs.getDefaultReplication(path), fs.getDefaultBlockSize(path), - null)) { + null)) { // scalastyle:ignore null _.write(content.getBytes) } } private def getPath(lineage: DataLineage): Option[Path] = lineage.rootOperation match { - case dn: Write => Some(new Path(dn.path, fileName)) + case writeOp: Write => Some(new Path(writeOp.path, fileName)) case _ => None } } diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala index d9f0be422..d20e37540 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsPersistenceFactory.scala @@ -22,15 +22,16 @@ import org.apache.spark.SparkContext import za.co.absa.spline.persistence.api._ /** - * The object contains static information about settings needed for initialization of the HdfsPersistenceWriterFactory class. + * The class represents persistence layer factory for Hadoop FS (inluding AWS S3). Based on + * * [[za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory]]. */ object HadoopFsPersistenceFactory { - val fileNameKey = "spline.hdfs.file.name" - val filePermissionsKey = "spline.hdfs.file.permissions" + private val fileNameKey = "spline.hdfs.file.name" + private val filePermissionsKey = "spline.hdfs.file.permissions" } /** - * The class represents a factory creating HDFS persistence layers for all main data lineage entities. + * The class represents a factory creating Hadoop FS persistence layers for all main data lineage entities. * * @param configuration A source of settings */ @@ -40,7 +41,8 @@ class HadoopFsPersistenceFactory(configuration: Configuration) extends Persisten private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration private val fileName = configuration.getString(fileNameKey, "_LINEAGE") - private val defaultFilePermissions = FsPermission.getFileDefault.applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf)) + private val defaultFilePermissions = FsPermission.getFileDefault + .applyUMask(FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf)) private val filePermissions = new FsPermission(configuration.getString(filePermissionsKey, defaultFilePermissions.toShort.toString)) log.info(s"Lineage destination path: $fileName") diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala deleted file mode 100644 index ff9a878fc..000000000 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/serialization/JSONSerialization.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.enceladus.spline.persistence.serialization - -import org.json4s._ -import org.json4s.jackson.Serialization -import org.json4s.jackson.Serialization.write - -object JSONSerialization { - - implicit val formats: Formats = Serialization.formats(NoTypeHints) ++ org.json4s.ext.JavaTypesSerializers.all - - implicit class EntityToJson[T <: AnyRef](entity: T) { - def toJson: String = write(entity) - } - -} From 83ace7ec52bf1cde39599d18c526669207c10f96 Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Tue, 14 Sep 2021 09:42:24 +0200 Subject: [PATCH 4/6] #1894 PR review updates: `za.co.absa.enceladus.spline.persistence.HadoopFsPersistenceFactory` added to the spline.properties.template --- spark-jobs/src/main/resources/spline.properties.template | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-jobs/src/main/resources/spline.properties.template b/spark-jobs/src/main/resources/spline.properties.template index a18c6a1b6..91e65e128 100644 --- a/spark-jobs/src/main/resources/spline.properties.template +++ b/spark-jobs/src/main/resources/spline.properties.template @@ -20,7 +20,7 @@ # Set of properties for setting up persistence to MongoDB. # spline.persistence.factory=za.co.absa.spline.persistence.api.composition.ParallelCompositeFactory -spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory +spline.persistence.composition.factories=za.co.absa.spline.persistence.mongo.MongoPersistenceFactory,za.co.absa.enceladus.spline.persistence.HadoopFsPersistenceFactory spline.mongodb.url=mongodb://localhost:27017 spline.mongodb.name=spline From 67d547ac6267a6d36a6b25f6566d9ec79dd8fffe Mon Sep 17 00:00:00 2001 From: Daniel Kavan Date: Thu, 16 Sep 2021 11:09:12 +0200 Subject: [PATCH 5/6] #1894 buildfix after merge with develop having (atum 3.5+) - based on release notes howto https://github.com/AbsaOSS/atum/releases/tag/v3.5.0 --- .../spline/persistence/HadoopFsDataLineageWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala index 2d04fe63f..7354ed2de 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4s.Logging import za.co.absa.spline.common.ARM._ -import za.co.absa.atum.location.S3Location.StringS3LocationExt +import za.co.absa.commons.s3.SimpleS3Location.SimpleS3LocationExt import za.co.absa.spline.model.DataLineage import za.co.absa.spline.model.op.Write import za.co.absa.spline.persistence.api.DataLineageWriter @@ -62,9 +62,9 @@ class HadoopFsDataLineageWriter(hadoopConfiguration: Configuration, fileName: St * @return FS + relative path **/ def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = { - pathString.toS3Location match { + pathString.toSimpleS3Location match { case Some(s3Location) => - val s3Uri = new URI(s3Location.s3String) // s3:// + val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3:// val s3Path = new Path(s"/${s3Location.path}") // / val fs = FileSystem.get(s3Uri, hadoopConfiguration) From ad59799593f382e782214121e8cfb89ddef22654 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 17 Sep 2021 18:49:30 +0200 Subject: [PATCH 6/6] Blank line removal - Update spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- .../enceladus/spline/persistence/HadoopFsDataLineageWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala index 7354ed2de..ead0ee4e6 100644 --- a/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala +++ b/spark-jobs/src/main/scala/za/co/absa/enceladus/spline/persistence/HadoopFsDataLineageWriter.scala @@ -4,7 +4,6 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software