Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/main/scala/com/databricks/spark/csv/CsvParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class CsvParser extends Serializable {
private var codec: String = null
private var nullValue: String = ""
private var dateFormat: String = null
private var treatParseExceptionAsNull : Boolean = false

def withUseHeader(flag: Boolean): CsvParser = {
this.useHeader = flag
Expand Down Expand Up @@ -123,6 +124,16 @@ class CsvParser extends Serializable {
this
}

/**
* If this is set to true then dirty data, for example a string in a numeric column,
* or a mal-formed date will not cause a failure.
* Instead, that value will be null in the resulting data
*/
def withTreatParseExceptionAsNull(flag : Boolean) : CsvParser = {
this.treatParseExceptionAsNull = flag
this
}

/** Returns a Schema RDD for the given CSV path. */
@throws[RuntimeException]
def csvFile(sqlContext: SQLContext, path: String): DataFrame = {
Expand All @@ -143,7 +154,8 @@ class CsvParser extends Serializable {
inferSchema,
codec,
nullValue,
dateFormat)(sqlContext)
dateFormat,
treatParseExceptionAsNull)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}

Expand All @@ -165,7 +177,8 @@ class CsvParser extends Serializable {
inferSchema,
codec,
nullValue,
dateFormat)(sqlContext)
dateFormat,
treatParseExceptionAsNull)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}
}
10 changes: 7 additions & 3 deletions src/main/scala/com/databricks/spark/csv/CsvRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ case class CsvRelation protected[spark] (
inferCsvSchema: Boolean,
codec: String = null,
nullValue: String = "",
dateFormat: String = null)(@transient val sqlContext: SQLContext)
dateFormat: String = null,
treatParseExceptionAsNull: Boolean)
(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with PrunedScan with InsertableRelation {

// Share date format object as it is expensive to parse date pattern.
Expand Down Expand Up @@ -118,7 +120,8 @@ case class CsvRelation protected[spark] (
while (index < schemaFields.length) {
val field = schemaFields(index)
rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable,
treatEmptyValuesAsNulls, nullValue, simpleDateFormatter)
treatEmptyValuesAsNulls, nullValue, simpleDateFormatter,
treatParseExceptionAsNull)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index = index + 1
}
Some(Row.fromSeq(rowArray))
Expand Down Expand Up @@ -197,7 +200,8 @@ case class CsvRelation protected[spark] (
field.nullable,
treatEmptyValuesAsNulls,
nullValue,
simpleDateFormatter
simpleDateFormatter,
treatParseExceptionAsNull
)
subIndex = subIndex + 1
}
Expand Down
13 changes: 12 additions & 1 deletion src/main/scala/com/databricks/spark/csv/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ class DefaultSource
throw new Exception("Treat empty values as null flag can be true or false")
}

val treatParseExceptionAsNull = parameters.getOrElse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update README file to include this new option.

"treatParseExceptionAsNull", "false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest changing name of this option to something like insertNullOnErrors. Primarily because this data source is used by R and Python users and exceptions are not familiar concepts for them.

val treatParseExceptionAsNullFlag = if (treatParseExceptionAsNull == "false"){
false
} else if (treatParseExceptionAsNull == "true") {
true
} else {
throw new Exception("Treat parse exception as null flag can be true or false")
}

val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
// TODO validate charset?

Expand Down Expand Up @@ -159,7 +169,8 @@ class DefaultSource
inferSchemaFlag,
codec,
nullValue,
dateFormat)(sqlContext)
dateFormat,
treatParseExceptionAsNullFlag)(sqlContext)
}

override def createRelation(
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/databricks/spark/csv/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ package object csv {
ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace,
treatEmptyValuesAsNulls = false,
inferCsvSchema = inferSchema)(sqlContext)
inferCsvSchema = inferSchema,
treatParseExceptionAsNull = false)(sqlContext)
sqlContext.baseRelationToDataFrame(csvRelation)
}

Expand All @@ -81,7 +82,8 @@ package object csv {
ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace,
treatEmptyValuesAsNulls = false,
inferCsvSchema = inferSchema)(sqlContext)
inferCsvSchema = inferSchema,
treatParseExceptionAsNull = false)(sqlContext)
sqlContext.baseRelationToDataFrame(csvRelation)
}
}
Expand Down
54 changes: 34 additions & 20 deletions src/main/scala/com/databricks/spark/csv/util/TypeCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.text.{SimpleDateFormat, NumberFormat}
import java.util.Locale

import org.apache.spark.sql.types._
import org.json4s.ParserUtil.ParseException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not used and not needed.


import scala.util.Try

Expand All @@ -45,7 +46,8 @@ object TypeCast {
nullable: Boolean = true,
treatEmptyValuesAsNulls: Boolean = false,
nullValue: String = "",
dateFormatter: SimpleDateFormat = null): Any = {
dateFormatter: SimpleDateFormat = null,
parseExceptionAsNull : Boolean = false): Any = {
// if nullValue is not an empty string, don't require treatEmptyValuesAsNulls
// to be set to true
val nullValueIsNotEmpty = nullValue != ""
Expand All @@ -55,25 +57,34 @@ object TypeCast {
){
null
} else {
castType match {
case _: ByteType => datum.toByte
case _: ShortType => datum.toShort
case _: IntegerType => datum.toInt
case _: LongType => datum.toLong
case _: FloatType => Try(datum.toFloat)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue())
case _: DoubleType => Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
case _: BooleanType => datum.toBoolean
case _: DecimalType => new BigDecimal(datum.replaceAll(",", ""))
case _: TimestampType if dateFormatter != null =>
new Timestamp(dateFormatter.parse(datum).getTime)
case _: TimestampType => Timestamp.valueOf(datum)
case _: DateType if dateFormatter != null =>
new Date(dateFormatter.parse(datum).getTime)
case _: DateType => Date.valueOf(datum)
case _: StringType => datum
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
try {
castType match {
case _: ByteType => datum.toByte
case _: ShortType => datum.toShort
case _: IntegerType => datum.toInt
case _: LongType => datum.toLong
case _: FloatType => Try(datum.toFloat)
.getOrElse(NumberFormat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indent here and bellow in this block.

.getInstance(Locale.getDefault).parse(datum).floatValue())
case _: DoubleType => Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.getDefault)
.parse(datum).doubleValue())
case _: BooleanType => datum.toBoolean
case _: DecimalType => new BigDecimal(datum.replaceAll(",", ""))
case _: TimestampType if dateFormatter != null =>
new Timestamp(dateFormatter.parse(datum).getTime)
case _: TimestampType => Timestamp.valueOf(datum)
case _: DateType if dateFormatter != null =>
new Date(dateFormatter.parse(datum).getTime)
case _: DateType => Date.valueOf(datum)
case _: StringType => datum
case _ => throw new UnsupportedTypeException(s"Unsupported type: ${castType.typeName}")
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe just do this, for example, as below without try/catch:

case _ if (parseExceptionAsNull && nullable) => null
case _ => throw new UnsupportedTypeException(s"Unsupported type: ${castType.typeName}")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, navermind. I got it.

}
}
catch {
case e : UnsupportedTypeException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix spacing: case e: UnsupportedTypeException =>

throw e
case e => if (parseExceptionAsNull && nullable) null else throw e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case e: Throwable => ...

}
}
}
Expand Down Expand Up @@ -106,3 +117,6 @@ object TypeCast {
}
}
}

class UnsupportedTypeException(message: String = null, cause: Throwable = null)
extends RuntimeException(message, cause)
5 changes: 5 additions & 0 deletions src/test/resources/cars_dirty.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
year,make,model,price,comment,blank
2012,Tesla,S"80,000.65"
2013.5,Ford,E350,35,000,"Go get one now they are going fast"
2015,,Volt,5,000
new,"",Volt,5000.00
45 changes: 44 additions & 1 deletion src/test/scala/com/databricks/spark/csv/CsvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.scalatest.Matchers._
abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
val carsFile = "src/test/resources/cars.csv"
val carsMalformedFile = "src/test/resources/cars-malformed.csv"
val carsDirtyTsvFile = "src/test/resources/cars_dirty.csv"
val carsFile8859 = "src/test/resources/cars_iso-8859-1.csv"
val carsTsvFile = "src/test/resources/cars.tsv"
val carsAltFile = "src/test/resources/cars-alternative.csv"
Expand Down Expand Up @@ -67,6 +68,12 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
super.afterAll()
}
}
test("Dirty Data CSV"){
val results = sqlContext.csvFile(
carsDirtyTsvFile, parserLib = parserLib
).collect()
assert(results.length == 4)
}

test("DSL test") {
val results = sqlContext
Expand Down Expand Up @@ -197,9 +204,44 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
.select("Name")
.collect().size

val r = new CsvParser()
.withSchema(strictSchema)
.withUseHeader(true)
.withParserLib(parserLib)
.withParseMode(ParseModes.DROP_MALFORMED_MODE)
.csvFile(sqlContext, ageFile)
.select("Name")
.collect()
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to fix the indentation here,

val r = new CsvParser()
  .withSchema(strictSchema)
  .withUseHeader(true)
  .withParserLib(parserLib)
  .withParseMode(ParseModes.DROP_MALFORMED_MODE)
  .csvFile(sqlContext, ageFile)
  .select("Name")
  .collect()

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe val parser instead of val r just to be consistent)

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for noise. I took another look. It seems this val r is not related with this PR and not used?


assert(results === 1)
}

test("Parse Exception with Schema"){
val carsSchema = new StructType(
Array(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = true),
StructField("model", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("blank", IntegerType, nullable = true)
)
)

val results = new CsvParser()
.withSchema(carsSchema)
.withUseHeader(true)
.withDelimiter(',')
.withQuoteChar('\"').withTreatParseExceptionAsNull(true)
.csvFile(sqlContext, carsDirtyTsvFile).select("year", "make")
.collect()

assert(results(0).toSeq == Seq(2012, "Tesla"))
assert(results(1).toSeq == Seq(null, "Ford"))
assert(results(2).toSeq == Seq(2015, ""))
assert(results(3).toSeq == Seq(null, ""))
}

test("DSL test for FAILFAST parsing mode") {
val parser = new CsvParser()
.withParseMode(ParseModes.FAIL_FAST_MODE)
Expand Down Expand Up @@ -267,6 +309,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
.withDelimiter(',')
.withQuoteChar(null)
.withUseHeader(true)
.withNullValue("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this one is not related with this PR. The default value will be already "".

.withParserLib(parserLib)
.csvFile(sqlContext, carsUnbalancedQuotesFile)
.select("year")
Expand Down Expand Up @@ -677,7 +720,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
assert(results.schema == StructType(List(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = true),
StructField("model", StringType ,nullable = true),
StructField("model", StringType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("blank", StringType, nullable = true))
))
Expand Down
46 changes: 46 additions & 0 deletions src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.scalatest.FunSuite

import org.apache.spark.sql.types._

import scala.util.Try

class TypeCastSuite extends FunSuite {

test("Can parse decimal type values") {
Expand Down Expand Up @@ -115,4 +117,48 @@ class TypeCastSuite extends FunSuite {
assert(TypeCast.castTo("", StringType, true, false, "") == "")
assert(TypeCast.castTo("", StringType, true, true, "") == null)
}

test("Parse exception is caught correctly"){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add space test("Parse exception is caught correctly") {


def testParseException( castType : DataType, badValues : Seq[String]): Unit = {
badValues.foreach(testValue => {
assert(TypeCast.castTo(testValue, castType, true, false, "", null, true) == null)
// if not nullable it isn't null
assert(Try(TypeCast.castTo(testValue, castType, false, false, "", null, true)).isFailure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using Try catch the exception and then assert that it includes correct message.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be any manor of type casting exceptions though? Its anything thrown by the type conversion.

}
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation and remove extra function here, (see, scala-style-guide#anonymous-methods) maybe as below:

def testParseException(castType : DataType, badValues : Seq[String]): Unit = {
  badValues.foreach { testValue =>
    assert(TypeCast.castTo(testValue, castType, true, false, "", null, true) == null)
    // if not nullable it isn't null
    assert(Try(TypeCast.castTo(testValue, castType, false, false, "", null, true)).isFailure)
  }
}


assert(TypeCast.castTo("10", ByteType, true, false, "", null, true) == 10)
testParseException(ByteType, Seq("10.5", "s", "true"))

assert(TypeCast.castTo("10", ShortType, true, false, "", null, true) == 10)
testParseException(ShortType, Seq("s", "true"))

assert(TypeCast.castTo("10", IntegerType, true, false, "", null, true) == 10)
testParseException(IntegerType, Seq("10.5", "s", "true"))

assert(TypeCast.castTo("10", LongType, true, false, "", null, true) == 10)
testParseException(LongType, Seq("10.5", "s", "true"))

assert(TypeCast.castTo("1.00", FloatType, true, false, "", null, true) == 1.0)
testParseException(FloatType, Seq("s", "true"))

assert(TypeCast.castTo("1.00", DoubleType, true, false, "", null, true) == 1.0)
testParseException(DoubleType, Seq("s", "true"))

assert(TypeCast.castTo("true", BooleanType, true, false, "", null, true) == true)
testParseException(BooleanType, Seq("s", "5"))

val timestamp = "2015-01-01 00:00:00"
assert(TypeCast.castTo(timestamp, TimestampType, true, false, "", null, true)
== Timestamp.valueOf(timestamp))
testParseException(TimestampType, Seq("5", "string"))

assert(TypeCast.castTo("2015-01-01", DateType, true, false, "", null, true)
== Date.valueOf("2015-01-01"))
testParseException(DateType, Seq("5", "string", timestamp))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra lines


}