Skip to content

Commit

Permalink
feat(obfuscate columns udf) :Obfuscate columns udf (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
golankiviti authored Aug 4, 2021
1 parent 7fdd083 commit bc1e587
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 16 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,26 @@ There are some custom functions already implemented as part of the Metorikku JAR
dfName: dfToFill
tableName: tableToFillWith
```
- **ObfuscateColumns:** Obfuscates columns in the dataframe, supports md5, sha256, and a literal value.
```yaml
- dataFrameName: resultFrame
classpath: com.yotpo.metorikku.code.steps.obfuscate.ObfuscateColumns
params:
table: table
columns: 'col1,col2,col3'
delimiter: ','
value: sha256
```

```yaml
- dataFrameName: resultFrame
classpath: com.yotpo.metorikku.code.steps.obfuscate.ObfuscateColumns
params:
table: table
columns: 'col1|col2|col3'
delimiter: '|'
value: '********'
```

#### Apache Hive metastore
Metorikku supports reading and saving tables with Apache hive metastore.
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ libraryDependencies ++= Seq(
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"com.amazon.deequ" % "deequ" % deequVersion.value excludeAll(excludeSpark, excludeScalanlp),
"org.apache.avro" % "avro" % "1.8.2" % "provided",
"com.databricks" %% "spark-xml" % "0.11.0"
"com.databricks" %% "spark-xml" % "0.11.0",
"com.outr" %% "hasher" % "1.2.2"
)

resolvers ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.yotpo.metorikku.code.steps

case class InputMatcher[K](ks: K*) {
def unapplySeq[V](m: Map[K, V]): Option[Seq[V]] = if (ks.forall(m.contains)) Some(ks.map(m)) else None
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ object LoadIfExists {
val message = "You need to send 2 parameters with the names of a df and a name of a table to try to load: dfName, tableName"

private val log: Logger = LogManager.getLogger(this.getClass)
private class InputMatcher[K](ks: K*) {
def unapplySeq[V](m: Map[K, V]): Option[Seq[V]] = if (ks.forall(m.contains)) Some(ks.map(m)) else None
}
private val InputMatcher = new InputMatcher("dfName", "tableName")
private val LoadIfExistsInputMatcher = InputMatcher("dfName", "tableName")

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
params.get match {
case InputMatcher(dfName, tableName) => {
case LoadIfExistsInputMatcher(dfName, tableName) => {
log.info(s"Attempting to load $tableName")

if (ss.catalog.tableExists(tableName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ object SelectiveMerge {
private val log: Logger = LogManager.getLogger(this.getClass)
private val colRenameSuffixLength = 10000 // (5 digits)
private val colRenamePrefix = scala.util.Random.nextInt(colRenameSuffixLength).toString
private class InputMatcher[K](ks: K*) {
def unapplySeq[V](m: Map[K, V]): Option[Seq[V]] = if (ks.forall(m.contains)) Some(ks.map(m)) else None
}
private val InputMatcher = new InputMatcher("df1", "df2", "joinKeys")
private val SelectiveMergeInputMatcher = InputMatcher("df1", "df2", "joinKeys")


def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
params.get match {
case InputMatcher(df1Name, df2Name, joinKeysStr) => {
case SelectiveMergeInputMatcher(df1Name, df2Name, joinKeysStr) => {
log.info(s"Selective merging $df1Name into $df2Name using keys $joinKeysStr")
val df1Raw = ss.table(df1Name)
val df2Raw = ss.table(df2Name)
Expand Down
7 changes: 2 additions & 5 deletions src/main/scala/com/yotpo/metorikku/code/steps/ToAvro.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ object ToAvro {
val message = "You need to send the following parameters to output to Avro format:" +
"table, schema.registry.url, schema.registry.topic, schema.name, schema.namespace " +
"Will create an entry in the schema registry under: <schema.registry.topic>-value or <schema.registry.topic>-key"
private class InputMatcher[K](ks: K*) {
def unapplySeq[V](m: Map[K, V]): Option[Seq[V]] = if (ks.forall(m.contains)) Some(ks.map(m)) else None
}
private val InputMatcher = new InputMatcher("table", "schema.registry.url", "schema.registry.topic", "schema.name", "schema.namespace")
private val ToAvroInputMatcher = new InputMatcher("table", "schema.registry.url", "schema.registry.topic", "schema.name", "schema.namespace")

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
params.get match {
case InputMatcher(tableName, schemaRegistryUrl, schemaRegistryTopic, schemaName, schemaNamespace) => {
case ToAvroInputMatcher(tableName, schemaRegistryUrl, schemaRegistryTopic, schemaName, schemaNamespace) => {
val dataFrame = ss.table(tableName)

val commonRegistryConfig = Map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.yotpo.metorikku.code.steps.obfuscate

case class ColumnsNotPartOfOriginalSchemaException(columns: Array[String])
extends Exception(s"The following columns are not a part of the original schema and therefore cannot be obfuscated: " +
s"${columns.mkString(", ")}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.yotpo.metorikku.code.steps.obfuscate

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{col, lit, udf, when}
import com.roundeights.hasher.Implicits._
import com.yotpo.metorikku.code.steps.InputMatcher
import com.yotpo.metorikku.exceptions.MetorikkuException

object ObfuscateColumns {
private val ObfuscateColumnsInputMatcher = InputMatcher("columns", "delimiter", "value", "table")

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
val (columns, value, table) = parse(params)
val df = ss.table(table)
val obfuscatedDf = obfuscateColumns(df, columns, value)

obfuscatedDf.createOrReplaceTempView(dataFrameName)
}

private def parse(params: Option[Map[String, String]]): (Array[String], String, String) = {
params.get match {
case ObfuscateColumnsInputMatcher(columns, delimiter, value, table) => (columns.split(delimiter), value, table)
case _ => throw MetorikkuException(
"""Obfuscate Columns expects the following parameters: columns,
delimiter, value, and table. one or more of those wasn't supplied""".stripMargin
)
}
}

def obfuscateColumns(df: DataFrame, columns: Array[String], value: String): DataFrame = {
val columnsInOriginalSchema = df.schema.map(_.name)
val columnsNotInOriginalSchema = columns.filter(col => !columnsInOriginalSchema.contains(col))

if (columnsNotInOriginalSchema.nonEmpty) throw ColumnsNotPartOfOriginalSchemaException(columnsNotInOriginalSchema)

val transform: String => Column = value match {
case "md5" => colName: String => md5Udf(col(colName))
case "sha256" => colName: String => sha256Udf(col(colName))
case _ => colName: String => lit(value)
}
// scalastyle:off null
val transformPreservingNull = (colName: String) => when(col(colName).isNotNull, transform(colName)).otherwise(lit(null))
// scalastyle:on null

columns.foldLeft(df)((dfAcc, column) => dfAcc.withColumn(column, transformPreservingNull(column)))
}

private def md5Udf = udf { s: String => s.md5.hex }

private def sha256Udf = udf { s: String => s.sha256.hex }
}
Loading

0 comments on commit bc1e587

Please sign in to comment.