From cc2c9c0a298f866eaa3e9133a478c0eb01656b2a Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Fri, 11 Sep 2015 16:15:40 -0400 Subject: [PATCH 1/7] Add nullToken to csvFile options; add switch to treat empty values as nulls when reading csv. --- .../com/databricks/spark/csv/CsvParser.scala | 8 +++++++ .../databricks/spark/csv/CsvRelation.scala | 3 ++- .../databricks/spark/csv/DefaultSource.scala | 9 +++++++ .../com/databricks/spark/csv/package.scala | 10 ++++++-- .../databricks/spark/csv/util/TypeCast.scala | 4 ++-- .../com/databricks/spark/csv/CsvSuite.scala | 24 +++++++++++++++++++ 6 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 1458a69..a2b6a49 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -35,6 +35,7 @@ class CsvParser extends Serializable { private var parseMode: String = ParseModes.DEFAULT private var ignoreLeadingWhiteSpace: Boolean = false private var ignoreTrailingWhiteSpace: Boolean = false + private var treatEmptyValuesAsNulls: Boolean = false private var parserLib: String = ParserLibs.DEFAULT private var charset: String = TextFile.DEFAULT_CHARSET.name() private var inferSchema: Boolean = false @@ -84,6 +85,11 @@ class CsvParser extends Serializable { this } + def withTreatEmptyValuesAsNulls(treatAsNull: Boolean): CsvParser = { + this.treatEmptyValuesAsNulls = treatAsNull + this + } + def withParserLib(parserLib: String): CsvParser = { this.parserLib = parserLib this @@ -114,6 +120,7 @@ class CsvParser extends Serializable { parserLib, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls, schema, inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(relation) @@ -133,6 +140,7 @@ class CsvParser extends Serializable { parserLib, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls, schema, inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(relation) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index fd7564f..ddea571 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -43,6 +43,7 @@ case class CsvRelation protected[spark] ( parserLib: String, ignoreLeadingWhiteSpace: Boolean, ignoreTrailingWhiteSpace: Boolean, + treatEmptyValuesAsNulls: Boolean, userSchema: StructType = null, inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { @@ -114,7 +115,7 @@ case class CsvRelation protected[spark] ( index = 0 while (index < schemaFields.length) { val field = schemaFields(index) - rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable) + rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable, treatEmptyValuesAsNulls) index = index + 1 } Some(Row.fromSeq(rowArray)) diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index c89b5ef..e7c6888 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -110,6 +110,14 @@ class DefaultSource } else { throw new Exception("Ignore white space flag can be true or false") } + val treatEmptyValuesAsNulls = parameters.getOrElse("treatEmptyValuesAsNulls", "false") + val treatEmptyValuesAsNullsFlag = if(treatEmptyValuesAsNulls == "false") { + false + } else if(treatEmptyValuesAsNulls == "true") { + true + } else { + throw new Exception("Treat empty values as null flag can be true or false") + } val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) // TODO validate charset? @@ -135,6 +143,7 @@ class DefaultSource parserLib, ignoreLeadingWhiteSpaceFlag, ignoreTrailingWhiteSpaceFlag, + treatEmptyValuesAsNullsFlag, schema, inferSchemaFlag)(sqlContext) } diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index e39c86a..e32a998 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -37,6 +37,7 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, + treatEmptyValuesAsNulls: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), inferSchema: Boolean = false) = { val csvRelation = CsvRelation( @@ -51,6 +52,7 @@ package object csv { parserLib = parserLib, ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls = treatEmptyValuesAsNulls, inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -60,6 +62,7 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, + treatEmptyValuesAsNulls: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), inferSchema: Boolean = false) = { val csvRelation = CsvRelation( @@ -74,6 +77,7 @@ package object csv { parserLib = parserLib, ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, + treatEmptyValuesAsNulls = treatEmptyValuesAsNulls, inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -114,11 +118,13 @@ package object csv { case None => None } + val nullToken = parameters.getOrElse("nullToken", "null") + val csvFormatBase = CSVFormat.DEFAULT .withDelimiter(delimiterChar) .withEscape(escapeChar) .withSkipHeaderRecord(false) - .withNullString("null") + .withNullString(nullToken) val csvFormat = quoteChar match { case Some(c) => csvFormatBase.withQuote(c) @@ -137,7 +143,7 @@ package object csv { .withDelimiter(delimiterChar) .withEscape(escapeChar) .withSkipHeaderRecord(false) - .withNullString("null") + .withNullString(nullToken) val csvFormat = quoteChar match { case Some(c) => csvFormatBase.withQuote(c) diff --git a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala index c3f4de2..24841de 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala @@ -35,8 +35,8 @@ object TypeCast { * @param datum string value * @param castType SparkSQL type */ - private[csv] def castTo(datum: String, castType: DataType, nullable: Boolean = true): Any = { - if (datum == "" && nullable && !castType.isInstanceOf[StringType]){ + private[csv] def castTo(datum: String, castType: DataType, nullable: Boolean = true, treatEmptyValuesAsNulls: Boolean = false): Any = { + if (datum == "" && nullable && (!castType.isInstanceOf[StringType] || treatEmptyValuesAsNulls)){ null } else { castType match { diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 230d6e2..93b5ebe 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -163,6 +163,30 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) } + test("DSL test roundtrip nulls") { + // Create temp directory + TestUtils.deleteRecursively(new File(tempEmptyDir)) + new File(tempEmptyDir).mkdirs() + val copyFilePath = tempEmptyDir + "null-numbers.csv" + val agesSchema = StructType(List(StructField("name", StringType, true), + StructField("age", IntegerType, true))) + + val agesRows = Seq(Row("alice", 35), Row("bob", null), Row(null, 24)) + val agesRdd = sqlContext.sparkContext.parallelize(agesRows) + val agesDf = sqlContext.createDataFrame(agesRdd, agesSchema) + + agesDf.saveAsCsvFile(copyFilePath, Map("header" -> "true", "nullToken" -> "")) + + val agesCopy = new CsvParser() + .withSchema(agesSchema) + .withUseHeader(true) + .withTreatEmptyValuesAsNulls(true) + .withParserLib(parserLib) + .csvFile(sqlContext, copyFilePath) + + assert(agesCopy.count == agesRows.size) + assert(agesCopy.collect.toSeq == agesRows) + } test("DSL test with alternative delimiter and quote") { val results = new CsvParser() From ef89f78785def334f2626f756432311a931787a0 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Tue, 15 Sep 2015 11:48:52 -0400 Subject: [PATCH 2/7] Keep lines to max 100 characters. --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 3 ++- src/main/scala/com/databricks/spark/csv/util/TypeCast.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index ddea571..0f83b13 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -115,7 +115,8 @@ case class CsvRelation protected[spark] ( index = 0 while (index < schemaFields.length) { val field = schemaFields(index) - rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable, treatEmptyValuesAsNulls) + rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable, + treatEmptyValuesAsNulls) index = index + 1 } Some(Row.fromSeq(rowArray)) diff --git a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala index 24841de..e6eb7ca 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala @@ -35,7 +35,8 @@ object TypeCast { * @param datum string value * @param castType SparkSQL type */ - private[csv] def castTo(datum: String, castType: DataType, nullable: Boolean = true, treatEmptyValuesAsNulls: Boolean = false): Any = { + private[csv] def castTo(datum: String, castType: DataType, nullable: Boolean = true, + treatEmptyValuesAsNulls: Boolean = false): Any = { if (datum == "" && nullable && (!castType.isInstanceOf[StringType] || treatEmptyValuesAsNulls)){ null } else { From 678814f42a39c1889f904725dc42dcc01011c243 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Tue, 15 Sep 2015 11:58:57 -0400 Subject: [PATCH 3/7] Ignore order of read values in CsvSuite test. --- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 93b5ebe..f68a573 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -185,7 +185,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { .csvFile(sqlContext, copyFilePath) assert(agesCopy.count == agesRows.size) - assert(agesCopy.collect.toSeq == agesRows) + assert(agesCopy.collect.toSet == agesRows.toSet) } test("DSL test with alternative delimiter and quote") { From 877bb615e06dc2fab8f4f6e1910d505c917a737f Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Wed, 16 Sep 2015 11:27:12 -0400 Subject: [PATCH 4/7] Rename nullToken to nullValue. --- src/main/scala/com/databricks/spark/csv/package.scala | 6 +++--- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index e32a998..0a51ab3 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -118,13 +118,13 @@ package object csv { case None => None } - val nullToken = parameters.getOrElse("nullToken", "null") + val nullValue = parameters.getOrElse("nullValue", "null") val csvFormatBase = CSVFormat.DEFAULT .withDelimiter(delimiterChar) .withEscape(escapeChar) .withSkipHeaderRecord(false) - .withNullString(nullToken) + .withNullString(nullValue) val csvFormat = quoteChar match { case Some(c) => csvFormatBase.withQuote(c) @@ -143,7 +143,7 @@ package object csv { .withDelimiter(delimiterChar) .withEscape(escapeChar) .withSkipHeaderRecord(false) - .withNullString(nullToken) + .withNullString(nullValue) val csvFormat = quoteChar match { case Some(c) => csvFormatBase.withQuote(c) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index f68a573..2da4947 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -175,7 +175,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { val agesRdd = sqlContext.sparkContext.parallelize(agesRows) val agesDf = sqlContext.createDataFrame(agesRdd, agesSchema) - agesDf.saveAsCsvFile(copyFilePath, Map("header" -> "true", "nullToken" -> "")) + agesDf.saveAsCsvFile(copyFilePath, Map("header" -> "true", "nullValue" -> "")) val agesCopy = new CsvParser() .withSchema(agesSchema) From 67852858a5a7bb0634e2c5e99c9be1ab1cea94a6 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Sun, 20 Sep 2015 11:26:45 -0400 Subject: [PATCH 5/7] Remove treatEmptyValuesAsNulls option from csvFile and tsvFile methods to keep MiMa compatibility. --- src/main/scala/com/databricks/spark/csv/package.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 8b248a4..8c5b5af 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -38,7 +38,6 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, - treatEmptyValuesAsNulls: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), inferSchema: Boolean = false): DataFrame = { val csvRelation = CsvRelation( @@ -53,7 +52,7 @@ package object csv { parserLib = parserLib, ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, - treatEmptyValuesAsNulls = treatEmptyValuesAsNulls, + treatEmptyValuesAsNulls = false, inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -64,7 +63,6 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false, - treatEmptyValuesAsNulls: Boolean = false, charset: String = TextFile.DEFAULT_CHARSET.name(), inferSchema: Boolean = false): DataFrame = { val csvRelation = CsvRelation( @@ -79,7 +77,7 @@ package object csv { parserLib = parserLib, ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, - treatEmptyValuesAsNulls = treatEmptyValuesAsNulls, + treatEmptyValuesAsNulls = false, inferCsvSchema = inferSchema)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } From 01fa79c31c65b15440223892fe60943787000d26 Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Mon, 21 Sep 2015 12:22:57 -0400 Subject: [PATCH 6/7] Style fix on castTo method. --- .../scala/com/databricks/spark/csv/util/TypeCast.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala index e6eb7ca..265515e 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TypeCast.scala @@ -35,8 +35,11 @@ object TypeCast { * @param datum string value * @param castType SparkSQL type */ - private[csv] def castTo(datum: String, castType: DataType, nullable: Boolean = true, - treatEmptyValuesAsNulls: Boolean = false): Any = { + private[csv] def castTo( + datum: String, + castType: DataType, + nullable: Boolean = true, + treatEmptyValuesAsNulls: Boolean = false): Any = { if (datum == "" && nullable && (!castType.isInstanceOf[StringType] || treatEmptyValuesAsNulls)){ null } else { From 176d4e8534641f00735f4d4ae630f81935ea639f Mon Sep 17 00:00:00 2001 From: Andres Perez Date: Sun, 4 Oct 2015 13:11:18 -0400 Subject: [PATCH 7/7] Exclude TypeCast from MiMa checks. --- build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sbt b/build.sbt index 2851c88..992a103 100755 --- a/build.sbt +++ b/build.sbt @@ -88,6 +88,7 @@ mimaDefaultSettings ++ Seq( ProblemFilters.excludePackage("com.databricks.spark.csv.CsvRelation"), ProblemFilters.excludePackage("com.databricks.spark.csv.util.InferSchema"), ProblemFilters.excludePackage("com.databricks.spark.sql.readers"), + ProblemFilters.excludePackage("com.databricks.spark.csv.util.TypeCast"), // We allowed the private `CsvRelation` type to leak into the public method signature: ProblemFilters.exclude[IncompatibleResultTypeProblem]( "com.databricks.spark.csv.DefaultSource.createRelation")