Skip to content

Commit

Permalink
Merge pull request #13 from data-tools/Spark/Schemas
Browse files Browse the repository at this point in the history
Spark/Schemas
  • Loading branch information
JavierMonton authored Jan 9, 2021
2 parents 8691f3c + 85e4310 commit 3043e2b
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 39 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
### Big Data Types 0.0.7
### Big Data Types v0.1.0

- Spark: Added support for Spark Schemas (only Spark 2.12)

### Big Data Types v0.0.7

- BigQuery: TypeClass syntax for case classes instances

### Big Data Types 0.0.6
### Big Data Types v0.0.6

- BigQuery: JavaConverters for cross version builds
- BigQuery: Formats object that allows different keys transformation like CamelCase -> snake_case for fields
45 changes: 42 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ This can be done up to 5 concatenated classes
import com.google.cloud.bigquery.{Field, Schema}
import org.datatools.bigdatatypes.formats.TransformKeys.defaultFormats
import org.datatools.bigdatatypes.bigquery.BigQueryTypes
import scala.jdk.CollectionConverters.IterableHasAsJava

case class MyTable(field1: Int, field2: String)
//List of BigQuery Fields, it can be used to construct an Schema
val fields: List[Field] = BigQueryTypes[MyTable].getBigQueryFields
val fields: List[Field] = BigQueryTypes[MyTable].bigQueryFields
//BigQuery Schema, it can be used to create a table
val schema: Schema = Schema.of(fields.asJava)
```
Expand All @@ -95,8 +94,48 @@ val fields: List[Field] = data.getBigQueryFields
See more info about [creating tables on BigQuery](https://cloud.google.com/bigquery/docs/tables#java) in the official documentation

### Connecting to your BigQuery environment
If you want to create tables using the library you will need to specify a service account and a project id.
If you want to create tables using the library you will need to connect to your BigQuery environment
through any of the GCloud options.
Probably the most common will be to specify a service account and a project id.
It can be added on environment variables. The library expects:
- PROJECT_ID: <your_project_id>
- GOOGLE_APPLICATION_CREDENTIAL: <path_to_your_service_account_json_file>

---

## Spark

### Spark Schema from Case Class

With Spark module, Spark Schemas can be created from Case Classes.
```scala
import org.apache.spark.sql.types.StructField
import org.datatools.bigdatatypes.spark.SparkTypes._
//an implicit Formats class is needed, defaultFormats does no transformations
//it can be created as implicit val instead of using this import
import org.datatools.bigdatatypes.formats.TransformKeys.defaultFormats

case class MyModel(myInt: Integer, myString: String)
val schema: StructField = SparkTypes[MyModel].sparkSchema
```
It works for Options, Sequences and any level of nested objects

Also, a Spark Schema can be extracted from a Case Class instance
```scala
val model = MyModel(1, "test")
model.sparkSchema
```
### Field transformations
Also, custom transformations can be applied to field names, something that usually is quite hard to do with Spark Datasets.
For example, working with CamelCase Case Classes but using snake_case field names in Spark Schema.

```scala
import org.apache.spark.sql.types.StructField
import org.datatools.bigdatatypes.spark.SparkTypes._
//implicit formats for transform keys to snake_case
import org.datatools.bigdatatypes.formats.TransformKeys.snakifyFields

case class MyModel(myInt: Integer, myString: String)
val schema: StructField = SparkTypes[MyModel].sparkSchema
//schema will have "my_int" and "my_string" fields
```
17 changes: 14 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name := "big-data-types"

//used to build Sonatype releases
version := "0.0.7"
version := "0.1.0"

lazy val scala213 = "2.13.3"
lazy val scala212 = "2.12.12"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala213, scala212)
scalaVersion := scala213
scalaVersion := scala212

crossVersionSharedSources
//crossScalaVersions := Nil
Expand All @@ -33,9 +33,20 @@ libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.clapper" %% "grizzled-slf4j" % "1.3.4",
"com.chuusai" %% "shapeless" % "2.3.3",
"com.google.cloud" % "google-cloud-bigquery" % "1.124.2"
"com.google.cloud" % "google-cloud-bigquery" % "1.124.2",

)

//dependencies for Spark - 2.12
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMajor)) if scalaMajor < 13 =>
Seq("org.apache.spark" %% "spark-core" % "3.0.1" % Provided,
"org.apache.spark" %% "spark-sql" % "3.0.1" % Provided)
case _ => Seq()
}
}

lazy val scalatest = "org.scalatest" %% "scalatest" % "3.2.2"

libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ private[bigquery] object BigQueryDefinitions {

/** Generates a BigQuery Table Schema given a type A
*/
def generateSchema[A: BigQueryTypes]: Schema = Schema.of(toJava(BigQueryTypes[A].getBigQueryFields))
def generateSchema[A: BigQueryTypes]: Schema = Schema.of(toJava(BigQueryTypes[A].bigQueryFields))

def generateSchema[A: BigQueryTypes, B: BigQueryTypes]: Schema =
Schema.of(toJava(BigQueryTypes[A].getBigQueryFields ++ BigQueryTypes[B].getBigQueryFields))
Schema.of(toJava(BigQueryTypes[A].bigQueryFields ++ BigQueryTypes[B].bigQueryFields))

def generateSchema[A: BigQueryTypes, B: BigQueryTypes, C: BigQueryTypes]: Schema =
Schema.of(toJava(BigQueryTypes[A].getBigQueryFields ++ BigQueryTypes[B].getBigQueryFields ++ BigQueryTypes[C].getBigQueryFields))
Schema.of(toJava(BigQueryTypes[A].bigQueryFields ++ BigQueryTypes[B].bigQueryFields ++ BigQueryTypes[C].bigQueryFields))

def generateSchema[A: BigQueryTypes, B: BigQueryTypes, C: BigQueryTypes, D: BigQueryTypes]: Schema =
Schema.of(
toJava(BigQueryTypes[A].getBigQueryFields ++
BigQueryTypes[B].getBigQueryFields ++
BigQueryTypes[C].getBigQueryFields ++
BigQueryTypes[D].getBigQueryFields)
toJava(BigQueryTypes[A].bigQueryFields ++
BigQueryTypes[B].bigQueryFields ++
BigQueryTypes[C].bigQueryFields ++
BigQueryTypes[D].bigQueryFields)
)

def generateSchema[A: BigQueryTypes, B: BigQueryTypes, C: BigQueryTypes, D: BigQueryTypes, E: BigQueryTypes]: Schema =
Schema.of(
toJava(BigQueryTypes[A].getBigQueryFields ++
BigQueryTypes[B].getBigQueryFields ++
BigQueryTypes[C].getBigQueryFields ++
BigQueryTypes[D].getBigQueryFields ++
BigQueryTypes[E].getBigQueryFields)
toJava(BigQueryTypes[A].bigQueryFields ++
BigQueryTypes[B].bigQueryFields ++
BigQueryTypes[C].bigQueryFields ++
BigQueryTypes[D].bigQueryFields ++
BigQueryTypes[E].bigQueryFields)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait BigQueryTypes[A] {

/** @return a list of [[Field]]s that represents [[A]]
*/
def getBigQueryFields: List[Field]
def bigQueryFields: List[Field]
}

object BigQueryTypes {
Expand All @@ -28,12 +28,11 @@ object BigQueryTypes {
/** Factory constructor - allows easier construction of instances */
def instance[A](fs: List[Field]): BigQueryTypes[A] =
new BigQueryTypes[A] {
def getBigQueryFields: List[Field] = fs
def bigQueryFields: List[Field] = fs
}

/** Instance derivation via SqlTypeConversion.
* Automatically converts camelCase names into snake_case in the process
* TODO: pass a function as a parameter, we should be able to decide if we want snake_case or other things from outside
*/
implicit def fieldsFromSqlTypeConversion[A: SqlTypeConversion](implicit f: Formats): BigQueryTypes[A] =
instance(getSchema(SqlTypeConversion[A].getType))
Expand Down Expand Up @@ -82,6 +81,6 @@ object BigQueryTypes {
* @tparam A is a Case Class
*/
implicit class BigQueryFieldSyntax[A <: Product](value: A) {
def getBigQueryFields(implicit a: BigQueryTypes[A]): List[Field] = a.getBigQueryFields
def getBigQueryFields(implicit a: BigQueryTypes[A]): List[Field] = a.bigQueryFields
}
}
108 changes: 108 additions & 0 deletions src/main/scala_2.13-/org/datatools/bigdatatypes/spark/SparkTypes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.datatools.bigdatatypes.spark

import org.apache.spark.sql.types._
import org.datatools.bigdatatypes.conversions.SqlTypeConversion
import org.datatools.bigdatatypes.formats.Formats
import org.datatools.bigdatatypes.types.basic._

/** Type class to convert generic SqlTypes into Spark specific fields
* In Spark, an schema is made with a Struct of fields so as an example:
* a case class will be converted into SqlTypes and then into a Struct of Spark fields
*
* @tparam A the type we want to obtain an schema from
*/
trait SparkTypes[A] {

/** @return a list of [[StructField]]s that represents [[A]]
*/
def sparkFields: List[StructField]

/**
* Returns the Spark Schema
* @return [[StructType]] with the schema to be used in Spark
*/
def sparkSchema: StructType = StructType(sparkFields)
}

object SparkTypes {

/** Summoner method. Allows the syntax */
def apply[A](implicit instance: SparkTypes[A]): SparkTypes[A] = instance

/** Factory constructor - allows easier construction of instances */
def instance[A](fs: List[StructField]): SparkTypes[A] =
new SparkTypes[A] {
def sparkFields: List[StructField] = fs
}

/** Instance derivation via SqlTypeConversion.
*/
implicit def fieldsFromSqlTypeConversion[A: SqlTypeConversion](implicit f: Formats): SparkTypes[A] =
instance(getSchema(SqlTypeConversion[A].getType))

/** Creates the schema (list of fields)
* Applies an implicit [[Formats.transformKeys]] in the process
* @param sqlType [[SqlType]]
* @param f [[Formats]] to apply while constructing the schema
* @return List of [[StructField]] representing the schema of the given type
*/
private def getSchema(sqlType: SqlType)(implicit f: Formats): List[StructField] = sqlType match {
case SqlStruct(Nil, _) => Nil
case SqlStruct((name, sqlType) :: records, mode) =>
getSchemaWithName(f.transformKeys(name), sqlType) :: getSchema(SqlStruct(records, mode))
}

/** Basic SqlTypes conversions to BigQuery Fields
* TODO: Use Formats to specify a default precision for DecimalType
*/
private def getSchemaWithName(name: String, sqlType: SqlType)(implicit f: Formats): StructField = sqlType match {
case SqlInt(mode) =>
StructField(name, sparkType(mode, IntegerType), isNullable(mode))
case SqlLong(mode) =>
StructField(name, sparkType(mode, LongType), isNullable(mode))
case SqlFloat(mode) =>
StructField(name, sparkType(mode, FloatType), isNullable(mode))
case SqlDecimal(mode) =>
StructField(name, sparkType(mode, DataTypes.createDecimalType), isNullable(mode))
case SqlBool(mode) =>
StructField(name, sparkType(mode, BooleanType), isNullable(mode))
case SqlString(mode) =>
StructField(name, sparkType(mode, StringType), isNullable(mode))
case SqlTimestamp(mode) =>
StructField(name, sparkType(mode, TimestampType), isNullable(mode))
case SqlDate(mode) =>
StructField(name, sparkType(mode, DateType), isNullable(mode))
case SqlStruct(subType, mode) =>
StructField(name, sparkType(mode, StructType(getSchema(SqlStruct(subType)))), isNullable(mode))
}

/** Find if a type has to be ArrayType or Basic type
* @param mode [[SqlTypeMode]] needed to check repeated or not
* @param sparkType valid [[DataType]] from Spark
* @return Spark [[DataType]]
*/
private def sparkType(mode: SqlTypeMode, sparkType: DataType): DataType = mode match {
case Repeated => ArrayType(sparkType, containsNull = isNullable(mode))
case _ => sparkType
}

/** Check if a field has to be nullable or not based on its [[SqlTypeMode]]
* Repeated is marked as nullable
* @param sqlTypeMode [[SqlTypeMode]]
* @return [[Boolean]] if field has to be nullable, else if not
*/
private def isNullable(sqlTypeMode: SqlTypeMode): Boolean = sqlTypeMode match {
case Nullable => true
case Repeated => true
case Required => false
}

/** Allows syntax .sparkSchema and .sparkFields for case classes instances
* @param value not used, needed for implicit
* @tparam A is a Case Class
*/
implicit class SparkSchemaSyntax[A <: Product](value: A) {
def sparkSchema(implicit a: SparkTypes[A]): StructType = a.sparkSchema
def sparkFields(implicit a: SparkTypes[A]): List[StructField] = a.sparkFields
}
}
29 changes: 29 additions & 0 deletions src/test/scala/org/datatools/bigdatatypes/TestTypes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.datatools.bigdatatypes

import java.sql.{Date, Timestamp}

object TestTypes {

case class BasicTypes(myInt: Int,
myLong: Long,
myFloat: Float,
myDecimal: BigDecimal,
myBoolean: Boolean,
myString: String
)

case class BasicOptionTypes(myInt: Option[Int],
myLong: Option[Long],
myFloat: Option[Float],
myDecimal: Option[BigDecimal],
myBoolean: Option[Boolean],
myString: Option[String]
)
case class BasicOption(myString: String, myOptionalString: Option[String])
case class BasicList(myInt: Int, myList: List[Int])
case class BasicStruct(myInt: Int, myStruct: BasicTypes)
case class BasicOptionalStruct(myInt: Int, myStruct: Option[BasicTypes])
case class Point(x: Int, y: Int)
case class ListOfStruct(matrix: List[Point])
case class ExtendedTypes(myInt: Int, myTimestamp: Timestamp, myDate: Date)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BigQueryTypesSpec extends UnitSpec {
}

"A Case Class type" should "return BigQuery Fields" in {
BigQueryTypes[Dummy].getBigQueryFields shouldBe expected
BigQueryTypes[Dummy].bigQueryFields shouldBe expected
}

}
Loading

0 comments on commit 3043e2b

Please sign in to comment.