Skip to content

Commit

Permalink
[ADAM-1018] Add support for Spark SQL Datasets.
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#1018. Adds the `adam-codegen` module, which generates classes that:

1. Implement the Scala Product interface and thus can be read into a Spark SQL
Dataset.
2. Have a complete constructor that is compatible with the constructor that
Spark SQL expects to see when exporting a Dataset back to Scala.
3. And, that have methods for converting to/from the bdg-formats Avro models.

Then, we build these model classes in the `org.bdgenomics.adam.sql` package,
and use them for export from the Avro based GenomicRDDs. With a Dataset, we
can then export to a DataFrame, which enables us to expose data through
Python via RDD->Dataset->DataFrame. This is important since the Avro classes
generated by bdg-formats can't be pickled, and thus we can't do a Java RDD to
Python RDD crossing with them.
  • Loading branch information
fnothaft committed May 22, 2017
1 parent 4a8d769 commit 1df2b75
Show file tree
Hide file tree
Showing 17 changed files with 671 additions and 5 deletions.
4 changes: 4 additions & 0 deletions adam-apis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,9 @@
<artifactId>scalatest_${scala.version.prefix}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version.prefix}</artifactId>
</dependency>
</dependencies>
</project>
4 changes: 4 additions & 0 deletions adam-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,9 @@
<artifactId>scala-guice_${scala.version.prefix}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version.prefix}</artifactId>
</dependency>
</dependencies>
</project>
98 changes: 98 additions & 0 deletions adam-codegen/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.bdgenomics.adam</groupId>
<artifactId>adam-parent_2.10</artifactId>
<version>0.23.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>adam-codegen_2.10</artifactId>
<packaging>jar</packaging>
<name>ADAM_${scala.version.prefix}: Avro-to-Dataset codegen utils</name>
<properties>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd</maven.build.timestamp.format>
</properties>
<build>
<plugins>
<!-- disable surefire -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<reportsDirectory>${project.build.directory}/scalatest-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>ADAMTestSuite.txt</filereports>
<!--
As explained here: http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info
The second option allows us better debugging for serialization-based errors.
-->
<argLine>-Xmx1024m -Dsun.io.serialization.extendedDebugInfo=true</argLine>
<stdout>F</stdout>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version.prefix}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.codegen

import java.io.{ File, FileWriter }
import org.apache.avro.Schema
import org.apache.avro.reflect.ReflectData
import scala.collection.JavaConversions._

object DumpSchemasToProduct {

def main(args: Array[String]) {
new DumpSchemasToProduct()(args)
}
}

class DumpSchemasToProduct {

private def getSchemaByReflection(className: String): Schema = {

// load the class
val classLoader = Thread.currentThread().getContextClassLoader()
val klazz = classLoader.loadClass(className)

// get the schema through reflection
ReflectData.get().getSchema(klazz)
}

private def toMatch(fields: Seq[(String, String)]): String = {
fields.map(_._1)
.zipWithIndex
.map(vk => {
val (field, idx) = vk
" case %d => %s".format(idx, field)
}).mkString("\n")
}

private def getType(schema: Schema): String = schema.getType match {
case Schema.Type.DOUBLE => "Double"
case Schema.Type.FLOAT => "Float"
case Schema.Type.INT => "Int"
case Schema.Type.LONG => "Long"
case Schema.Type.BOOLEAN => "Boolean"
case Schema.Type.STRING => "String"
case Schema.Type.ENUM => "String"
case Schema.Type.RECORD => schema.getName()
case other => throw new IllegalStateException("Unsupported type %s.".format(other))
}

private def getUnionType(schema: Schema): Schema = {
val unionTypes = schema.getTypes()
.filter(t => {
t.getType != Schema.Type.NULL
})
assert(unionTypes.size == 1)
unionTypes.head
}

private def fields(schema: Schema): Seq[(String, String)] = {
schema.getFields()
.map(field => {
val name = field.name
val fieldSchema = field.schema
val fieldType = fieldSchema.getType match {
case Schema.Type.ARRAY => {
"Seq[%s]".format(getType(fieldSchema.getElementType()))
}
case Schema.Type.MAP => {
"scala.collection.Map[String,%s]".format(getType(fieldSchema.getValueType()))
}
case Schema.Type.UNION => {
"Option[%s]".format(getType(getUnionType(fieldSchema)))
}
case other => {
throw new IllegalStateException("Unsupported type %s in field %s.".format(other, name))
}
}
(name, fieldType)
}).toSeq
}

private def conversion(schema: Schema, mapFn: String): String = schema.getType match {
case Schema.Type.DOUBLE => ".%s(d => d: java.lang.Double)".format(mapFn)
case Schema.Type.FLOAT => ".%s(f => f: java.lang.Float)".format(mapFn)
case Schema.Type.INT => ".%s(i => i: java.lang.Integer)".format(mapFn)
case Schema.Type.LONG => ".%s(l => l: java.lang.Long)".format(mapFn)
case Schema.Type.BOOLEAN => ".%s(b => b: java.lang.Boolean)".format(mapFn)
case Schema.Type.STRING => ""
case Schema.Type.ENUM => ".%s(e => %s.valueOf(e))".format(mapFn, schema.getFullName)
case Schema.Type.RECORD => ".%s(r => r.toAvro)".format(mapFn)
case other => throw new IllegalStateException("Unsupported type %s.".format(other))
}

private def setters(schema: Schema): String = {
schema.getFields
.map(field => {
val name = field.name

field.schema.getType match {
case Schema.Type.UNION => {
getUnionType(field.schema).getType match {
case Schema.Type.RECORD => " %s.foreach(field => record.set%s(field.toAvro))".format(name, name.capitalize)
case Schema.Type.ENUM => " %s.foreach(field => record.set%s(%s.valueOf(field)))".format(name, name.capitalize, getUnionType(field.schema).getFullName)
case Schema.Type.DOUBLE | Schema.Type.FLOAT |
Schema.Type.INT | Schema.Type.LONG |
Schema.Type.BOOLEAN | Schema.Type.STRING => " %s.foreach(field => record.set%s(field))".format(name, name.capitalize)
case other => throw new IllegalStateException("Unsupported type %s.".format(other))
}
}
case Schema.Type.ARRAY => {
val convAction = conversion(field.schema.getElementType(), "map")
" if (%s.nonEmpty) {\n record.set%s(%s%s)\n }".format(name, name.capitalize, name, convAction)
}
case Schema.Type.MAP => {
val convAction = conversion(field.schema.getValueType(), "mapValues")
" if (%s.nonEmpty) {\n record.set%s(%s%s.asJava)\n }".format(name, name.capitalize, name, convAction)
}
case _ => {
throw new IllegalArgumentException("Bad type %s.".format(field.schema))
}
}
}).mkString("\n")
}

private def dumpToAvroFn(schema: Schema): String = {
" val record = new %s()\n%s\n record".format(schema.getFullName,
setters(schema))
}

private def generateClassDump(className: String): String = {

// get schema
val schema = getSchemaByReflection(className)

// get class name without package
val classNameNoPackage = className.split('.').last

"\n%s\n\nclass %s (\n%s) extends Product {\n def productArity: Int = %d\n def productElement(i: Int): Any = i match {\n%s\n }\n def toAvro: %s = {\n%s\n }\n def canEqual(that: Any): Boolean = that match {\n case %s => true\n case _ => false\n }\n}".format(
dumpObject(schema),
classNameNoPackage,
fields(schema).map(p => " val %s: %s".format(p._1, p._2)).mkString(",\n"),
schema.getFields().size,
toMatch(fields(schema)),
schema.getFullName,
dumpToAvroFn(schema),
classNameNoPackage
)
}

private def getConversion(schema: Schema, mapFn: String): String = schema.getType match {
case Schema.Type.DOUBLE => ".%s(d => d: Double)".format(mapFn)
case Schema.Type.FLOAT => ".%s(f => f: Float)".format(mapFn)
case Schema.Type.INT => ".%s(i => i: Int)".format(mapFn)
case Schema.Type.LONG => ".%s(l => l: Long)".format(mapFn)
case Schema.Type.BOOLEAN => ".%s(b => b: Boolean)".format(mapFn)
case Schema.Type.STRING => ""
case Schema.Type.ENUM => ".%s(e => e.toString)".format(mapFn)
case Schema.Type.RECORD => ".%s(r => %s.fromAvro(r))".format(mapFn, schema.getName)
case other => throw new IllegalStateException("Unsupported type %s.".format(other))
}

private def getters(schema: Schema): String = {
schema.getFields
.map(field => {
val name = field.name

field.schema.getType match {
case Schema.Type.UNION => {
getUnionType(field.schema).getType match {
case Schema.Type.RECORD => " Option(record.get%s).map(field => %s.fromAvro(field))".format(name.capitalize, getUnionType(field.schema).getName)
case Schema.Type.ENUM => " Option(record.get%s).map(field => field.toString)".format(name.capitalize)
case Schema.Type.DOUBLE | Schema.Type.FLOAT |
Schema.Type.INT | Schema.Type.LONG |
Schema.Type.BOOLEAN | Schema.Type.STRING => " Option(record.get%s)%s".format(name.capitalize, getConversion(getUnionType(field.schema), "map"))
case other => throw new IllegalStateException("Unsupported type %s.".format(other))
}
}
case Schema.Type.ARRAY => {
val convAction = getConversion(field.schema.getElementType(), "map")
" record.get%s().toSeq%s".format(name.capitalize, convAction)
}
case Schema.Type.MAP => {
val convAction = getConversion(field.schema.getValueType(), "mapValues")
" record.get%s()%s.asScala".format(name.capitalize, convAction)
}
case _ => {
throw new IllegalArgumentException("Bad type %s.".format(field.schema))
}
}
}).mkString(",\n")
}

private def dumpObject(schema: Schema): String = {
"object %s extends Serializable {\n def apply(\n%s): %s = {\n new %s(\n%s)\n }\n def fromAvro(record: %s): %s = {\n new %s (\n%s)\n }\n}".format(
schema.getName,
fields(schema).map(p => " %s: %s".format(p._1, p._2)).mkString(",\n"),
schema.getName,
schema.getName,
fields(schema).map(_._1).map(s => " %s".format(s)).mkString(",\n"),
schema.getFullName,
schema.getName,
schema.getName,
getters(schema))
}

private def writeHeader(fw: FileWriter, packageName: String) {
val hdr = Seq(
"/**",
"* Licensed to Big Data Genomics (BDG) under one",
"* or more contributor license agreements. See the NOTICE file",
"* distributed with this work for additional information",
"* regarding copyright ownership. The BDG licenses this file",
"* to you under the Apache License, Version 2.0 (the",
"* \"License\"); you may not use this file except in compliance",
"* with the License. You may obtain a copy of the License at",
"*",
"* http://www.apache.org/licenses/LICENSE-2.0",
"*",
"* Unless required by applicable law or agreed to in writing, software",
"* distributed under the License is distributed on an \"AS IS\" BASIS,",
"* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
"* See the License for the specific language governing permissions and",
"* limitations under the License.",
"*/",
"package %s".format(packageName),
"",
"import scala.collection.JavaConversions._",
"import scala.collection.JavaConverters._").mkString("\n")

fw.write(hdr)
}

def apply(args: Array[String]) {

if (args.length < 3) {
println("DumpSchemas <package-name> <class-to-dump> ... <file-to-dump-to>")
System.exit(1)
} else {

// drop the file to write and the package name
val classesToDump = args.drop(1).dropRight(1)

// open the file to write
val dir = new File(args.last).getParentFile
if (!dir.exists()) {
dir.mkdirs()
}
val fw = new FileWriter(args.last)

// write the header
writeHeader(fw, args.head)

// loop and dump the classes
classesToDump.foreach(className => {
val dumpString = generateClassDump(className)

fw.write("\n")
fw.write(dumpString)
})

// we are done, so close and flush
fw.close()
}
}
}
Loading

0 comments on commit 1df2b75

Please sign in to comment.