Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#475 Add Delta Lake location or managed table as a bookkeeping storage option #478

Merged
merged 7 commits into from
Aug 28, 2024
Merged
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,74 @@ You specify:

Let's take a look at an example based on the Enceladus sink.

## Bookeeping

In order to support auto-recovery from failures, schema tracking and all other nice features, Pramen requires to use a database
or a storage for keeping the state of the pipeline.

### PostgreSQL database (recommended)
This is highly recommended way of storing bookkeeping data since it is the most efficient and feature rich.

Configuration:
```hocon
pramen {
bookkeeping.enabled = "true"

bookkeeping.jdbc {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://host:5433/pramen"
user = "username"
password = "password"
}
}
```

### MongoDb database
Here is how you can use a MongoDB database for storing bookkeeping information:

```hocon
pramen {
bookkeeping.enabled = "true"

bookkeeping.mongodb {
connection.string = "mongodb://aaabbb"
database = "mydb"
}
}
```

### Hadoop (CSV+JSON)
This is less recommended way, and is quite slow. But the advantage is that you don't need a database.

```hocon
pramen.bookkeeping {
enabled = "true"
location = "hdfs://path"
}
```

### Delta Lake
This requires Delta Lake format support from the cluster you are running pipelines at.

You can use wither a path:
```hocon
pramen.bookkeeping {
enabled = "true"
hadoop.format = "delta"
location = "s3://path"
}
```

or a set of managed tables:
```hocon
pramen.bookkeeping {
enabled = "true"
hadoop.format = "delta"
delta.database = "my_db" # Optional. 'default' will be used if not speified
delta.table.prefix = "bk_"
}
```

#### Enceladus ingestion pipelines for the Data Lake
Pramen can help with ingesting data for data lake pipelines of [Enceladus](https://github.com/AbsaOSS/enceladus).
A special sink (`EnceladusSink`) is used to save data to Enceladus' raw folder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ case class BookkeeperConfig(
bookkeepingHadoopFormat: HadoopFormat,
bookkeepingConnectionString: Option[String],
bookkeepingDbName: Option[String],
bookkeepingJdbcConfig: Option[JdbcConfig]
bookkeepingJdbcConfig: Option[JdbcConfig],
deltaDatabase: Option[String],
deltaTablePrefix: Option[String]
)

object BookkeeperConfig {
Expand All @@ -36,6 +38,8 @@ object BookkeeperConfig {
val BOOKKEEPING_HADOOP_FORMAT = "pramen.bookkeeping.hadoop.format"
val BOOKKEEPING_CONNECTION_STRING = "pramen.bookkeeping.mongodb.connection.string"
val BOOKKEEPING_DB_NAME = "pramen.bookkeeping.mongodb.database"
val BOOKKEEPING_DELTA_DB_NAME = "pramen.bookkeeping.delta.database"
val BOOKKEEPING_DELTA_TABLE_PREFIX = "pramen.bookkeeping.delta.table.prefix"

def fromConfig(conf: Config): BookkeeperConfig = {
val bookkeepingEnabled = conf.getBoolean(BOOKKEEPING_ENABLED)
Expand All @@ -44,14 +48,23 @@ object BookkeeperConfig {
val bookkeepingConnectionString = ConfigUtils.getOptionString(conf, BOOKKEEPING_CONNECTION_STRING)
val bookkeepingDbName = ConfigUtils.getOptionString(conf, BOOKKEEPING_DB_NAME)
val bookkeepingJdbcConfig = JdbcConfig.loadOption(conf.getConfig(BOOKKEEPING_PARENT), BOOKKEEPING_PARENT)
val deltaDatabase = ConfigUtils.getOptionString(conf, BOOKKEEPING_DELTA_DB_NAME)
val deltaTablePrefix = ConfigUtils.getOptionString(conf, BOOKKEEPING_DELTA_TABLE_PREFIX)

if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) {
throw new RuntimeException(s"One of the following should be defined: $BOOKKEEPING_PARENT.jdbc.url, $BOOKKEEPING_CONNECTION_STRING or $BOOKKEEPING_LOCATION" +
s" when bookkeeping is enabled. You can disable bookkeeping by setting $BOOKKEEPING_ENABLED = false.")
}
if (bookkeepingEnabled && bookkeepingJdbcConfig.isEmpty && bookkeepingHadoopFormat == HadoopFormat.Delta) {
if (bookkeepingLocation.isEmpty && deltaTablePrefix.isEmpty) {
throw new RuntimeException(s"In order to use Delta Lake for bookkeeping, either $BOOKKEEPING_LOCATION or $BOOKKEEPING_DELTA_TABLE_PREFIX must be defined. " +
s"Preferably $BOOKKEEPING_DELTA_DB_NAME should be defined as well for managed Delta Lake tables.")
}
} else {
if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) {
throw new RuntimeException(s"One of the following should be defined: $BOOKKEEPING_PARENT.jdbc.url, $BOOKKEEPING_CONNECTION_STRING or $BOOKKEEPING_LOCATION" +
s" when bookkeeping is enabled. You can disable bookkeeping by setting $BOOKKEEPING_ENABLED = false.")
}

if (bookkeepingConnectionString.isDefined && bookkeepingDbName.isEmpty) {
throw new RuntimeException(s"Database name is not defined. Please, define $BOOKKEEPING_DB_NAME.")
if (bookkeepingConnectionString.isDefined && bookkeepingDbName.isEmpty) {
throw new RuntimeException(s"Database name is not defined. Please, define $BOOKKEEPING_DB_NAME.")
}
}

BookkeeperConfig(
Expand All @@ -60,7 +73,9 @@ object BookkeeperConfig {
bookkeepingHadoopFormat,
bookkeepingConnectionString,
bookkeepingDbName,
bookkeepingJdbcConfig
bookkeepingJdbcConfig,
deltaDatabase,
deltaTablePrefix
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ trait HadoopFormat

case object HadoopFormat {
case object Text extends HadoopFormat
case object Delta extends HadoopFormat

def apply(format: String): HadoopFormat = format.toLowerCase match {
case "text" => Text
case "delta" => Delta
case _ => throw new IllegalArgumentException(s"Unknown Hadoop format: $format")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ object Bookkeeper {
log.info(s"Using MongoDB for lock management.")
new TokenLockFactoryMongoDb(connection)
case None =>
log.info(s"Using HadoopFS for lock management.")
new TokenLockFactoryHadoop(spark.sparkContext.hadoopConfiguration, bookkeepingConfig.bookkeepingLocation.get + "/locks")
bookkeepingConfig.bookkeepingLocation match {
case Some(path) =>
log.info(s"Using HadoopFS for lock management at $path/locks")
new TokenLockFactoryHadoopPath(spark.sparkContext.hadoopConfiguration, path + "/locks")
case None =>
log.warn(s"Locking is DISABLED.")
new TokenLockFactoryAllow
}
}
}
} else {
Expand All @@ -109,8 +115,20 @@ object Bookkeeper {
case None =>
bookkeepingConfig.bookkeepingHadoopFormat match {
case HadoopFormat.Text =>
log.info(s"Using Hadoop (CSV for records, JSON for schemas) for bookkeeping.")
new BookkeeperText(bookkeepingConfig.bookkeepingLocation.get)
val path = bookkeepingConfig.bookkeepingLocation.get
log.info(s"Using Hadoop (CSV for records, JSON for schemas) for bookkeeping at $path")
new BookkeeperText(path)
case HadoopFormat.Delta =>
bookkeepingConfig.deltaTablePrefix match {
case Some(tablePrefix) =>
val fullTableName = BookkeeperDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "*")
log.info(s"Using Delta Lake managed table '$fullTableName' for bookkeeping.")
new BookkeeperDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix)
case None =>
val path = bookkeepingConfig.bookkeepingLocation.get
log.info(s"Using Delta Lake for bookkeeping at $path")
new BookkeeperDeltaPath(path)
}
}
}
}
Expand All @@ -127,8 +145,24 @@ object Bookkeeper {
log.info(s"Using MongoDB to keep journal of executed jobs.")
new JournalMongoDb(connection)
case None =>
log.info(s"Using HadoopFS to keep journal of executed jobs.")
new JournalHadoop(bookkeepingConfig.bookkeepingLocation.get + "/journal")
bookkeepingConfig.bookkeepingHadoopFormat match {
case HadoopFormat.Text =>
val path = bookkeepingConfig.bookkeepingLocation.get + "/journal"
log.info(s"Using HadoopFS to keep journal of executed jobs at $path")
new JournalHadoopCsv(path)
case HadoopFormat.Delta =>
bookkeepingConfig.deltaTablePrefix match {
case Some(tablePrefix) =>
val fullTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix)
log.info(s"Using Delta Lake managed table '$fullTableName' for the journal.")
new JournalHadoopDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix)
case None =>
val path = bookkeepingConfig.bookkeepingLocation.get + "/journal"
log.info(s"Using Delta Lake for the journal at $path")
new JournalHadoopDeltaPath(path)
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, Dataset}
import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson
import za.co.absa.pramen.core.model.{DataChunk, TableSchema}

import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe

abstract class BookkeeperDeltaBase extends BookkeeperHadoop {

def getBkDf(filter: Column): Dataset[DataChunk]

def saveRecordCountDelta(dataChunk: DataChunk): Unit

def getSchemasDeltaDf: Dataset[TableSchemaJson]

def saveSchemaDelta(schemas: TableSchema): Unit

def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit

final override val bookkeepingEnabled: Boolean = true

final override def getLatestProcessedDateFromStorage(tableName: String, until: Option[LocalDate]): Option[LocalDate] = {
val filter = until match {
case Some(endDate) =>
val endDateStr = getDateStr(endDate)
col("tableName") === tableName && col("infoDate") <= endDateStr
case None =>
col("tableName") === tableName
}

val chunks = getBkData(filter)

if (chunks.isEmpty) {
None
} else {
val chunk = chunks.maxBy(_.infoDateEnd)
Option(LocalDate.parse(chunk.infoDateEnd, DataChunk.dateFormatter))
}
}

final override def getLatestDataChunkFromStorage(table: String, dateBegin: LocalDate, dateEnd: LocalDate): Option[DataChunk] = {
getDataChunks(table, dateBegin, dateEnd).lastOption
}

final override def getDataChunksFromStorage(tableName: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[DataChunk] = {
val infoDateFilter = getFilter(tableName, Option(infoDateBegin), Option(infoDateEnd))

getBkData(infoDateFilter)
}

final def getDataChunksCountFromStorage(table: String, dateBegin: Option[LocalDate], dateEnd: Option[LocalDate]): Long = {
getBkDf(getFilter(table, dateBegin, dateEnd)).count()
}

final private[pramen] override def saveRecordCountToStorage(table: String,
infoDate: LocalDate,
infoDateBegin: LocalDate,
infoDateEnd: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
jobStarted: Long,
jobFinished: Long): Unit = {
val dateStr = getDateStr(infoDate)
val dateBeginStr = getDateStr(infoDateBegin)
val dateEndStr = getDateStr(infoDateEnd)

val chunk = DataChunk(table, dateStr, dateBeginStr, dateEndStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished)

saveRecordCountDelta(chunk)
}

final override def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] = {
val filter = getFilter(table, None, Option(until))

val df = getSchemasDeltaDf

val tableSchemaOpt = df.filter(filter)
.orderBy(col("infoDate").desc, col("updatedTimestamp").desc)
.take(1)
.headOption

tableSchemaOpt.flatMap(s => {
TableSchema.toSchemaAndDate(TableSchema(s.tableName, s.infoDate, s.schemaJson))
})
}

private[pramen] override def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit = {
val tableSchema = TableSchema(table, infoDate.toString, schema.json)

saveSchemaDelta(tableSchema)
}

private[core] def getBkData(filter: Column): Seq[DataChunk] = {
getBkDf(filter)
.collect()
.groupBy(v => (v.tableName, v.infoDate))
.map { case (_, listChunks) =>
listChunks.maxBy(c => c.jobFinished)
}
.toArray[DataChunk]
.sortBy(_.infoDate)
}
}
Loading
Loading