Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,16 @@ cache:
- $HOME/.ivy2
# There's no nicer way to specify this matrix; see
# https://github.com/travis-ci/travis-ci/issues/1519.
# Spark 1.5.0 only supports Java 7+.
matrix:
include:
# We only test Spark 1.4.1 with Hadooop 2.2.0 because
# https://github.com/apache/spark/pull/6599 is not present in 1.4.1,
# so the published Spark Maven artifacts will not work with Hadoop 1.x.
- jdk: openjdk6
scala: 2.10.5
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="2.0.1"
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="1.0.4" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1"
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="1.2.1" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1"
# Scala 2.10.5 tests:
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1"
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.6.0" SPARK_AVRO_VERSION="2.0.1"
# Configuration corresponding to DBC 1.4.x driver package as of DBC 2.4,
# which uses spark-avro 1.0.0. We use Hadoop 2.2.0 here, while DBC uses
# 1.2.1, because the 1.4.1 published to Maven Central is a Hadoop 2.x build.
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="1.0.0"
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.1-SNAPSHOT" SPARK_AVRO_VERSION="3.0.0-preview2"
# Scala 2.11 tests:
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1"
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.1-SNAPSHOT" SPARK_AVRO_VERSION="3.0.0-preview2"
env:
global:
# AWS_REDSHIFT_JDBC_URL
Expand All @@ -55,5 +34,6 @@ env:

script:
- ./dev/run-tests-travis.sh

after_success:
- bash <(curl -s https://codecov.io/bash)
28 changes: 15 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,25 @@ This library is more suited to ETL than interactive queries, since large amounts

## Installation

This library requires Apache Spark 1.4+ and Amazon Redshift 1.0.963+.
This library requires Apache Spark 2.0+ and Amazon Redshift 1.0.963+.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is going to be the README when people browse to the repository on GitHub, we should probably have a pointer to a branch containing the Spark 1.x version of the code or a table listing the releases for the Spark 1.x line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I do this after making the branch for Spark 1.x?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new branch at https://github.com/databricks/spark-redshift/tree/branch-1.x for holding the 1.x code.


For version that works with Spark 1.x, please check for the [1.x branch](https://github.com/databricks/spark-redshift/tree/branch-1.x).

You may use this library in your applications with the following dependency information:

**Scala 2.10**

```
groupId: com.databricks
artifactId: spark-redshift_2.10
version: 1.0.0
version: 2.0.0-SNAPSHOT
```

**Scala 2.11**
```
groupId: com.databricks
artifactId: spark-redshift_2.11
version: 1.0.0
version: 2.0.0-SNAPSHOT
```

You will also need to provide a JDBC driver that is compatible with Redshift. Amazon recommend that you use [their driver](http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html), which is distributed as a JAR that is hosted on Amazon's website. This library has also been successfully tested using the Postgres JDBC driver.
Expand Down Expand Up @@ -81,19 +84,19 @@ val df: DataFrame = sqlContext.read

df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()

// Using IAM Role based authentication
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3n://path/for/temp/data")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()
```
Expand Down Expand Up @@ -130,8 +133,7 @@ df.write \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("error") \
.save()



# Using IAM Role based authentication
df.write \
.format("com.databricks.spark.redshift") \
Expand Down
13 changes: 8 additions & 5 deletions project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,22 @@ object SparkRedshiftBuild extends Build {
.settings(
name := "spark-redshift",
organization := "com.databricks",
scalaVersion := "2.10.5",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
sparkVersion := "1.4.1",
sparkVersion := "2.0.1-SNAPSHOT",
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("2.0.1"),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0-preview2"),
testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"),
spName := "databricks/spark-redshift",
sparkComponents ++= Seq("sql", "hive"),
spIgnoreProvided := true,
licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"),
credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),
resolvers +=
resolvers ++= Seq(
"Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
// TODO: remove after Spark 2.0.0 is released:
"Apache Snapshots" at "https://repository.apache.org/snapshots/"
),
scalacOptions ++= Seq("-target:jvm-1.6"),
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
libraryDependencies ++= Seq(
Expand All @@ -75,7 +78,7 @@ object SparkRedshiftBuild extends Build {
"com.amazonaws" % "aws-java-sdk-sts" % "1.10.22" % "test" exclude("com.fasterxml.jackson.core", "jackson-databind"),
// We require spark-avro, but avro-mapred must be provided to match Hadoop version.
// In most cases, avro-mapred will be provided as part of the Spark assembly JAR.
"com.databricks" %% "spark-avro" % "2.0.1",
"com.databricks" %% "spark-avro" % "3.0.0-preview2",
if (testHadoopVersion.value.startsWith("1")) {
"org.apache.avro" % "avro-mapred" % "1.7.7" % "provided" classifier "hadoop1" exclude("org.mortbay.jetty", "servlet-api")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ trait IntegrationSuiteBase

override protected def beforeEach(): Unit = {
super.beforeEach()
sqlContext = new TestHiveContext(sc)
sqlContext = new TestHiveContext(sc, loadTestTables = false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
assume(org.apache.spark.SPARK_VERSION.take(3) >= "1.6")
val df = sqlContext.sql("select testbool from test_table where testbool = true")
val physicalPlan = df.queryExecution.sparkPlan
physicalPlan.collectFirst { case f: execution.Filter => f }.foreach { filter =>
fail(s"Filter should have been eliminated; plan is:\n$physicalPlan")
physicalPlan.collectFirst { case f: execution.FilterExec => f }.foreach { filter =>
fail(s"Filter should have been eliminated:\n${df.queryExecution}")
}
}

Expand Down Expand Up @@ -355,7 +355,7 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
// .rdd() forces the first query to be unloaded from Redshift
val rdd1 = sqlContext.sql("select testint from test_table").rdd
// Similarly, this also forces an unload:
val rdd2 = sqlContext.sql("select testdouble from test_table").rdd
sqlContext.sql("select testdouble from test_table").rdd
// If the unloads were performed into the same directory then this call would fail: the
// second unload from rdd2 would have overwritten the integers with doubles, so we'd get
// a NumberFormatException.
Expand Down Expand Up @@ -599,9 +599,9 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val rdd = sc.parallelize(TestUtils.expectedData.toSeq)
val rdd = sc.parallelize(TestUtils.expectedData)
val df = sqlContext.createDataFrame(rdd, TestUtils.testSchema)
df.registerTempTable(test_table) // to ensure that the table already exists
df.createOrReplaceTempView(test_table) // to ensure that the table already exists

// Check that SaveMode.ErrorIfExists throws an exception
intercept[AnalysisException] {
Expand Down
32 changes: 13 additions & 19 deletions src/main/scala/com/databricks/spark/redshift/FilterPushdown.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package com.databricks.spark.redshift

import java.sql.{Date, Timestamp}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand All @@ -46,32 +44,28 @@ private[redshift] object FilterPushdown {
* could not be converted.
*/
def buildFilterExpression(schema: StructType, filter: Filter): Option[String] = {
def buildComparison(attr: String, internalValue: Any, comparisonOp: String): Option[String] = {
def buildComparison(attr: String, value: Any, comparisonOp: String): Option[String] = {
getTypeForAttribute(schema, attr).map { dataType =>
val value: Any = {
// Workaround for SPARK-10195: prior to Spark 1.5.0, the Data Sources API exposed internal
// types, so we must perform conversions if running on older versions:
if (SPARK_VERSION < "1.5.0") {
CatalystTypeConverters.convertToScala(internalValue, dataType)
} else {
internalValue
}
val sqlEscapedValue: String = dataType match {
case StringType => s"\\'${value.toString.replace("'", "\\'\\'")}\\'"
case DateType => s"\\'${value.asInstanceOf[Date]}\\'"
case TimestampType => s"\\'${value.asInstanceOf[Timestamp]}\\'"
case _ => value.toString
}
val sqlEscapedValue: String = dataType match {
case StringType => s"\\'${value.toString.replace("'", "\\'\\'")}\\'"
case DateType => s"\\'${value.asInstanceOf[Date]}\\'"
case TimestampType => s"\\'${value.asInstanceOf[Timestamp]}\\'"
case _ => value.toString
}
s""""$attr" $comparisonOp $sqlEscapedValue"""
}
s""""$attr" $comparisonOp $sqlEscapedValue"""
}
}

filter match {
case EqualTo(attr, value) => buildComparison(attr, value, "=")
case LessThan(attr, value) => buildComparison(attr, value, "<")
case GreaterThan(attr, value) => buildComparison(attr, value, ">")
case LessThanOrEqual(attr, value) => buildComparison(attr, value, "<=")
case GreaterThanOrEqual(attr, value) => buildComparison(attr, value, ">=")
case IsNotNull(attr) =>
getTypeForAttribute(schema, attr).map(dataType => s""""$attr" IS NOT NULL""")
case IsNull(attr) =>
getTypeForAttribute(schema, attr).map(dataType => s""""$attr" IS NULL""")
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ private[redshift] class RedshiftRecordReader extends RecordReader[JavaLong, Arra
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
val split = inputSplit.asInstanceOf[FileSplit]
val file = split.getPath
val conf: Configuration = {
// Use reflection to get the Configuration. This is necessary because TaskAttemptContext is
// a class in Hadoop 1.x and an interface in Hadoop 2.x.
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
val conf: Configuration = context.getConfiguration
delimiter = RedshiftInputFormat.getDelimiterOrDefault(conf).asInstanceOf[Byte]
require(delimiter != escapeChar,
s"The delimiter and the escape char cannot be the same but found $delimiter.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -88,28 +89,6 @@ private[redshift] class JDBCWrapper {
}
}

/**
* Reflectively calls Spark's `DriverRegistry.register()`, which handles corner-cases related to
* using JDBC drivers that are not accessible from the bootstrap classloader.
*/
private def registerDriver(driverClass: String): Unit = {
// DriverRegistry.register() is one of the few pieces of private Spark functionality which
// we need to rely on. This class was relocated in Spark 1.5.0, so we need to use reflection
// in order to support both Spark 1.4.x and 1.5.x.
if (SPARK_VERSION.startsWith("1.4")) {
val className = "org.apache.spark.sql.jdbc.package$DriverRegistry$"
val driverRegistryClass = Utils.classForName(className)
val registerMethod = driverRegistryClass.getDeclaredMethod("register", classOf[String])
val companionObject = driverRegistryClass.getDeclaredField("MODULE$").get(null)
registerMethod.invoke(companionObject, driverClass)
} else { // Spark 1.5.0+
val className = "org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry"
val driverRegistryClass = Utils.classForName(className)
val registerMethod = driverRegistryClass.getDeclaredMethod("register", classOf[String])
registerMethod.invoke(null, driverClass)
}
}

/**
* Execute the given SQL statement while supporting interruption.
* If InterruptedException is caught, then the statement will be cancelled if it is running.
Expand Down Expand Up @@ -174,7 +153,6 @@ private[redshift] class JDBCWrapper {
while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
val fieldSize = rsmd.getPrecision(i + 1)
val fieldScale = rsmd.getScale(i + 1)
val isSigned = rsmd.isSigned(i + 1)
Expand Down Expand Up @@ -203,7 +181,7 @@ private[redshift] class JDBCWrapper {
credentials: Option[(String, String)]) : Connection = {
val subprotocol = url.stripPrefix("jdbc:").split(":")(0)
val driverClass: String = getDriverClass(subprotocol, userProvidedDriverClass)
registerDriver(driverClass)
DriverRegistry.register(driverClass)
val driverWrapperClass: Class[_] = if (SPARK_VERSION.startsWith("1.4")) {
Utils.classForName("org.apache.spark.sql.jdbc.package$DriverWrapper")
} else { // Spark 1.5.0+
Expand All @@ -226,10 +204,9 @@ private[redshift] class JDBCWrapper {
throw new IllegalArgumentException(s"Did not find registered driver with class $driverClass")
}
val properties = new Properties()
credentials.foreach { case(user, password) => {
properties.setProperty("user", user)
properties.setProperty("password", password)
}
credentials.foreach { case(user, password) =>
properties.setProperty("user", user)
properties.setProperty("password", password)
}
driver.connect(url, properties)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ private[redshift] case class RedshiftRelation(
writer.saveToRedshift(sqlContext, data, saveMode, params)
}

// In Spark 1.6+, this method allows a data source to declare which filters it handles, allowing
// Spark to skip its own defensive filtering. See SPARK-10978 for more details. As long as we
// compile against Spark 1.4, we cannot use the `override` modifier here.
def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filterNot(filter => FilterPushdown.buildFilterExpression(schema, filter).isDefined)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import java.sql.{Connection, Date, SQLException, Timestamp}

import com.amazonaws.auth.AWSCredentials
import com.amazonaws.services.s3.AmazonS3Client
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.TaskContext
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.util.control.NonFatal

import com.databricks.spark.redshift.Parameters.MergedParameters

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -234,7 +234,7 @@ private[redshift] class RedshiftWriter(
val nonEmptyPartitions =
sqlContext.sparkContext.accumulableCollection(mutable.HashSet.empty[Int])

val convertedRows: RDD[Row] = data.mapPartitions { iter =>
val convertedRows: RDD[Row] = data.rdd.mapPartitions { iter: Iterator[Row] =>
if (iter.hasNext) {
nonEmptyPartitions += TaskContext.get.partitionId()
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/databricks/spark/redshift/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package com.databricks.spark
import com.amazonaws.services.s3.AmazonS3Client
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row, SaveMode, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}

package object redshift {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class FilterPushdownSuite extends FunSuite {
GreaterThan("test_double", 1000.0),
LessThan("test_double", Double.MaxValue),
GreaterThanOrEqual("test_float", 1.0f),
LessThanOrEqual("test_int", 43))
LessThanOrEqual("test_int", 43),
IsNotNull("test_int"),
IsNull("test_int"))
val whereClause = buildWhereClause(testSchema, filters)
// scalastyle:off
val expectedWhereClause =
Expand All @@ -69,6 +71,8 @@ class FilterPushdownSuite extends FunSuite {
|AND "test_double" < 1.7976931348623157E308
|AND "test_float" >= 1.0
|AND "test_int" <= 43
|AND "test_int" IS NOT NULL
|AND "test_int" IS NULL
""".stripMargin.lines.mkString(" ").trim
// scalastyle:on
assert(whereClause === expectedWhereClause)
Expand Down
Loading