diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index 4288ffb2..b8eed869 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -147,17 +147,6 @@ private[redshift] object Parameters { */ def jdbcDriver: Option[String] = parameters.get("jdbcdriver") - /** - * If true, when writing, replace any existing data. When false, append to the table instead. - * Note that the table schema will need to be compatible with whatever you have in the DataFrame - * you're writing. spark-redshift makes no attempt to enforce that - you'll just see Redshift - * errors if they don't match. - * - * Defaults to false. - */ - @deprecated("Use SaveMode instead", "0.5.0") - def overwrite: Boolean = parameters("overwrite").toBoolean - /** * Set the Redshift table distribution style, which can be one of: EVEN, KEY or ALL. If you set * it to KEY, you'll also need to use the distkey parameter to set the distribution key. diff --git a/src/main/scala/com/databricks/spark/redshift/SchemaParser.scala b/src/main/scala/com/databricks/spark/redshift/SchemaParser.scala deleted file mode 100644 index d58c6f22..00000000 --- a/src/main/scala/com/databricks/spark/redshift/SchemaParser.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2014 Databricks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.redshift - -import scala.util.parsing.combinator._ - -import org.apache.spark.sql.types._ - -/** - * A simple parser for Redshift table schemas. - * - * Note: the only method which uses this class has been deprecated, so this class should be - * removed in `spark-redshift` 0.6. We will not accept patches to extend this parser. - */ -@deprecated("Do not use SchemaParser directly", "0.5.0") -private[redshift] object SchemaParser extends JavaTokenParsers { - // redshift data types: http://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html - private val SMALLINT: Parser[DataType] = ("smallint" | "int2") ^^^ ShortType - private val INTEGER: Parser[DataType] = ("integer" | "int" | "int4") ^^^ IntegerType - private val BIGINT: Parser[DataType] = ("bigint" | "int8") ^^^ LongType - private val DECIMAL: Parser[DataType] = // map all decimal to long for now - ("decimal" | "numeric") ~ "(" ~ decimalNumber ~ "," ~ decimalNumber ~ ")" ^^^ LongType - private val REAL: Parser[DataType] = ("real" | "float4") ^^^ FloatType - private val DOUBLE: Parser[DataType] = ("double precision" | "float" | "float8") ^^^ DoubleType - private val BOOLEAN: Parser[DataType] = "boolean" ^^^ BooleanType - private val VARCHAR: Parser[DataType] = - ("varchar" | "character varying" | "nvarchar" - | "text" | "char" | "character" - | "nchar" | "bpchar") ~ (("(" ~ decimalNumber ~ ")") | "") ^^^ StringType - private val DATE: Parser[DataType] = "date" ^^^ DateType - private val TIMESTAMP: Parser[DataType] = - ("timestamp" | "timestamp without time zone") ^^^ TimestampType - - private val sqlType: Parser[DataType] = - SMALLINT | INTEGER | BIGINT | DECIMAL | VARCHAR | DATE | BOOLEAN | REAL | DOUBLE | TIMESTAMP - private val structField: Parser[StructField] = (ident ~ sqlType) ^^ { - case colName ~ colType => StructField(colName, colType, nullable = true) - } - private val structType: Parser[StructType] = structField.* ^^ { - case fields => StructType(fields) - } - - def parseSchema(schema: String): StructType = { - parse(structType, schema).get - } -} diff --git a/src/main/scala/com/databricks/spark/redshift/package.scala b/src/main/scala/com/databricks/spark/redshift/package.scala index 1051f06a..a02cdd95 100644 --- a/src/main/scala/com/databricks/spark/redshift/package.scala +++ b/src/main/scala/com/databricks/spark/redshift/package.scala @@ -17,10 +17,9 @@ package com.databricks.spark -import com.amazonaws.services.s3.AmazonS3Client import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} package object redshift { @@ -45,50 +44,13 @@ package object redshift { } /** - * Reads a table unload from Redshift with its schema in format "name0 type0 name1 type1 ...". + * Reads a table unload from Redshift with its schema. */ - @deprecated("Use data sources API or perform string -> data type casts yourself", "0.5.0") - def redshiftFile(path: String, schema: String): DataFrame = { - val structType = SchemaParser.parseSchema(schema) - val casts = structType.fields.map { field => + def redshiftFile(path: String, schema: StructType): DataFrame = { + val casts = schema.fields.map { field => col(field.name).cast(field.dataType).as(field.name) } - redshiftFile(path, structType.fieldNames).select(casts: _*) - } - - /** - * Read a Redshift table into a DataFrame, using S3 for data transfer and JDBC - * to control Redshift and resolve the schema - */ - @deprecated("Use sqlContext.read()", "0.5.0") - def redshiftTable(parameters: Map[String, String]): DataFrame = { - val params = Parameters.mergeParameters(parameters) - sqlContext.baseRelationToDataFrame( - RedshiftRelation( - DefaultJDBCWrapper, creds => new AmazonS3Client(creds), params, None)(sqlContext)) - } - } - - /** - * Add write functionality to DataFrame - */ - @deprecated("Use DataFrame.write()", "0.5.0") - implicit class RedshiftDataFrame(dataFrame: DataFrame) { - - /** - * Load the DataFrame into a Redshift database table. By default, this will append to the - * specified table. If the `overwrite` parameter is set to `true` then this will drop the - * existing table and re-create it with the contents of this DataFrame. - */ - @deprecated("Use DataFrame.write()", "0.5.0") - def saveAsRedshiftTable(parameters: Map[String, String]): Unit = { - val params = Parameters.mergeParameters(parameters) - val saveMode = if (params.overwrite) { - SaveMode.Overwrite - } else { - SaveMode.Append - } - DefaultRedshiftWriter.saveToRedshift(dataFrame.sqlContext, dataFrame, saveMode, params) + redshiftFile(path, schema.fieldNames).select(casts: _*) } } } diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftInputFormatSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftInputFormatSuite.scala index 200dd1bb..28467c1d 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftInputFormatSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftInputFormatSuite.scala @@ -126,15 +126,7 @@ class RedshiftInputFormatSuite extends FunSuite with BeforeAndAfterAll { val escaped = escape(testRecords.map(_.map(_.toString)), DEFAULT_DELIMITER) writeToFile(escaped, new File(dir, "part-00000")) - val conf = new Configuration - conf.setLong(KEY_BLOCK_SIZE, 4) - val sqlContext = new SQLContext(sc) - - val srdd = sqlContext.redshiftFile( - dir.toString, - "name varchar(10) state text id integer score float big_score numeric(4, 0) " + - "some_long bigint") val expectedSchema = StructType(Seq( StructField("name", StringType, nullable = true), StructField("state", StringType, nullable = true), @@ -142,11 +134,14 @@ class RedshiftInputFormatSuite extends FunSuite with BeforeAndAfterAll { StructField("score", DoubleType, nullable = true), StructField("big_score", LongType, nullable = true), StructField("some_long", LongType, nullable = true))) - assert(srdd.schema === expectedSchema) - val parsed = srdd.rdd.map { - case Row(name: String, state: String, id: Int, score: Double, - bigScore: Long, someLong: Long) => - Seq(name, state, id, score, bigScore, someLong) + + val df = sqlContext.redshiftFile(dir.toString, expectedSchema) + assert(df.schema === expectedSchema) + + val parsed = df.rdd.map { + case Row( + name: String, state: String, id: Int, score: Double, bigScore: Long, someLong: Long + ) => Seq(name, state, id, score, bigScore, someLong) }.collect().toSet assert(parsed === testRecords) diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index b8750cc4..020eae83 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -513,7 +513,7 @@ class RedshiftSourceSuite "query" -> "select * from test_table") val e1 = intercept[IllegalArgumentException] { - expectedDataDF.saveAsRedshiftTable(invalidParams) + expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save() } assert(e1.getMessage.contains("dbtable")) } @@ -522,12 +522,12 @@ class RedshiftSourceSuite val invalidParams = Map("dbtable" -> "foo") // missing tempdir and url val e1 = intercept[IllegalArgumentException] { - expectedDataDF.saveAsRedshiftTable(invalidParams) + expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save() } assert(e1.getMessage.contains("tempdir")) val e2 = intercept[IllegalArgumentException] { - testSqlContext.redshiftTable(invalidParams) + expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save() } assert(e2.getMessage.contains("tempdir")) } @@ -539,7 +539,11 @@ class RedshiftSourceSuite test("Saves throw error message if S3 Block FileSystem would be used") { val params = defaultParams + ("tempdir" -> defaultParams("tempdir").replace("s3n", "s3")) val e = intercept[IllegalArgumentException] { - expectedDataDF.saveAsRedshiftTable(params) + expectedDataDF.write + .format("com.databricks.spark.redshift") + .mode("append") + .options(params) + .save() } assert(e.getMessage.contains("Block FileSystem")) }