From 458c09787b1325f95896c7f79015f041c53770dc Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 25 Aug 2018 12:42:03 -0500 Subject: [PATCH 1/9] Configurable empty values when reading/writing CSV files --- python/pyspark/sql/readwriter.py | 30 +++++---- python/pyspark/sql/streaming.py | 14 ++-- .../apache/spark/sql/DataFrameReader.scala | 1 + .../apache/spark/sql/DataFrameWriter.scala | 1 + .../datasources/csv/CSVOptions.scala | 7 +- .../sql/streaming/DataStreamReader.scala | 1 + .../resources/test-data/cars-empty-value.csv | 4 ++ .../execution/datasources/csv/CSVSuite.scala | 65 +++++++++++++++++++ 8 files changed, 102 insertions(+), 21 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/cars-empty-value.csv diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 49f4e6b2ede1..8f50cd17a18d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -345,11 +345,11 @@ def text(self, paths, wholetext=False, lineSep=None): @since(2.0) def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, - columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - samplingRatio=None, enforceSchema=None): + ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None, + positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, + maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, + mode=None, columnNameOfCorruptRecord=None, multiLine=None, + charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -395,6 +395,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, empty string. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None @@ -457,9 +459,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, - nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, - maxCharsPerColumn=maxCharsPerColumn, + emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf, + negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, + maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, @@ -857,9 +859,9 @@ def text(self, path, compression=None, lineSep=None): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, - header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, - charToEscapeQuoteEscaping=None, encoding=None): + header=None, nullValue=None, emptyValue=None, escapeQuotes=None, quoteAll=None, + dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, + ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -891,6 +893,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, ``""``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the @@ -916,8 +920,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, - nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat, + nullValue=nullValue, emptyValue=emptyValue, escapeQuotes=escapeQuotes, + quoteAll=quoteAll, dateFormat=dateFormat, timestampFormat=timestampFormat, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee13778a7dcd..e3fbc9fa2f03 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -560,9 +560,9 @@ def text(self, path, wholetext=False, lineSep=None): @since(2.0) def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, + ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None, + positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, + maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, enforceSchema=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. @@ -611,6 +611,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, empty string. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None @@ -669,9 +671,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, - nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, - maxCharsPerColumn=maxCharsPerColumn, + emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf, + negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, + maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0cfcc45fb3d3..e6c2cba79841 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -571,6 +571,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * whitespaces from values being read should be skipped. *
  • `nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.
  • + *
  • `emptyValue` (default empty string): sets the string representation of an empty value.
  • *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity * value.
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index eca2d5b97190..dfb8c4718550 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -635,6 +635,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * enclosed in quotes. Default is to only escape values containing a quote character. *
  • `header` (default `false`): writes the names of columns as the first line.
  • *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • + *
  • `emptyValue` (default `""`): sets the string representation of an empty value.
  • *
  • `encoding` (by default it is not set): specifies encoding (charset) of saved csv * files. If it is not set, the UTF-8 charset will be used.
  • *
  • `compression` (default `null`): compression codec to use when saving to file. This can be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index fab8d62da0c1..f84f783604e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -117,6 +117,9 @@ class CSVOptions( val nullValue = parameters.getOrElse("nullValue", "") + val emptyValueInRead = parameters.getOrElse("emptyValue", "") + val emptyValueInWrite = parameters.getOrElse("emptyValue", "\"\"") + val nanValue = parameters.getOrElse("nanValue", "NaN") val positiveInf = parameters.getOrElse("positiveInf", "Inf") @@ -173,7 +176,7 @@ class CSVOptions( writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) writerSettings.setNullValue(nullValue) - writerSettings.setEmptyValue("\"\"") + writerSettings.setEmptyValue(emptyValueInWrite) writerSettings.setSkipEmptyLines(true) writerSettings.setQuoteAllFields(quoteAll) writerSettings.setQuoteEscapingEnabled(escapeQuotes) @@ -194,7 +197,7 @@ class CSVOptions( settings.setInputBufferSize(inputBufferSize) settings.setMaxColumns(maxColumns) settings.setNullValue(nullValue) - settings.setEmptyValue("") + settings.setEmptyValue(emptyValueInRead) settings.setMaxCharsPerColumn(maxCharsPerColumn) settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) settings diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 39e9e1ad426b..2a4db4afbe00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -327,6 +327,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * whitespaces from values being read should be skipped.
  • *
  • `nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.
  • + *
  • `emptyValue` (default empty string): sets the string representation of an empty value.
  • *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity * value.
  • diff --git a/sql/core/src/test/resources/test-data/cars-empty-value.csv b/sql/core/src/test/resources/test-data/cars-empty-value.csv new file mode 100644 index 000000000000..0f20a2f23ac0 --- /dev/null +++ b/sql/core/src/test/resources/test-data/cars-empty-value.csv @@ -0,0 +1,4 @@ +year,make,model,comment,blank +"2012","Tesla","S","","" +1997,Ford,E350,"Go get one now they are going fast", +2015,Chevy,Volt,,"" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 5a1d6679ebbd..9a375f6657b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -50,6 +50,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val carsAltFile = "test-data/cars-alternative.csv" private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv" private val carsNullFile = "test-data/cars-null.csv" + private val carsEmptyValueFile = "test-data/cars-empty-value.csv" private val carsBlankColName = "test-data/cars-blank-column-name.csv" private val emptyFile = "test-data/empty.csv" private val commentsFile = "test-data/comments.csv" @@ -668,6 +669,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null)) } + test("empty fields with user defined empty values") { + + // year,make,model,comment,blank + val dataSchema = StructType(List( + StructField("year", IntegerType, nullable = true), + StructField("make", StringType, nullable = false), + StructField("model", StringType, nullable = false), + StructField("comment", StringType, nullable = true), + StructField("blank", StringType, nullable = true))) + val cars = spark.read + .format("csv") + .schema(dataSchema) + .option("header", "true") + .option("emptyValue", "empty") + .load(testFile(carsEmptyValueFile)) + + verifyCars(cars, withHeader = true, checkValues = false) + val results = cars.collect() + assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty")) + assert(results(1).toSeq === + Array(1997, "Ford", "E350", "Go get one now they are going fast", null)) + assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty")) + } + + test("save csv with empty fields with user defined empty values") { + withTempDir { dir => + val csvDir = new File(dir, "csv").getCanonicalPath + + // year,make,model,comment,blank + val dataSchema = StructType(List( + StructField("year", IntegerType, nullable = true), + StructField("make", StringType, nullable = false), + StructField("model", StringType, nullable = false), + StructField("comment", StringType, nullable = true), + StructField("blank", StringType, nullable = true))) + val cars = spark.read + .format("csv") + .schema(dataSchema) + .option("header", "true") + .option("nullValue", "NULL") + .load(testFile(carsEmptyValueFile)) + + cars.coalesce(1).write + .format("csv") + .option("header", "true") + .option("emptyValue", "empty") + .option("nullValue", null) + .save(csvDir) + + val carsCopy = spark.read + .format("csv") + .schema(dataSchema) + .option("header", "true") + .load(csvDir) + + verifyCars(carsCopy, withHeader = true, checkValues = false) + val results = carsCopy.collect() + assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty")) + assert(results(1).toSeq === + Array(1997, "Ford", "E350", "Go get one now they are going fast", null)) + assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty")) + } + } + test("save csv with compression codec option") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath From 471b8ba593388b0e53bbc95d71c032e40e058ae5 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sun, 26 Aug 2018 18:29:32 -0500 Subject: [PATCH 2/9] Adding tests --- .../datasources/csv/CSVDataSource.scala | 7 +-- .../datasources/csv/CSVInferSchema.scala | 3 +- .../datasources/csv/CSVInferSchemaSuite.scala | 14 ++++++ .../execution/datasources/csv/CSVSuite.scala | 46 +++++++++++++++++++ 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 2b86054c0ffc..5f427239ced1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -91,9 +91,10 @@ abstract class CSVDataSource extends Serializable { } row.zipWithIndex.map { case (value, index) => - if (value == null || value.isEmpty || value == options.nullValue) { - // When there are empty strings or the values set in `nullValue`, put the - // index as the suffix. + if (value == null || value.isEmpty || value == options.nullValue || + value == options.emptyValueInRead) { + // When there are empty strings or the values set in `nullValue` or in `emptyValue`, + // put the index as the suffix. s"_c$index" } else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { // When there are case-insensitive duplicates, put the index as the suffix. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index a585cbed2551..e7743b07f866 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -79,7 +79,8 @@ private[csv] object CSVInferSchema { * point checking if it is an Int, as the final type must be Double or higher. */ def inferField(typeSoFar: DataType, field: String, options: CSVOptions): DataType = { - if (field == null || field.isEmpty || field == options.nullValue) { + if (field == null || field.isEmpty || field == options.nullValue || + field == options.emptyValueInRead) { typeSoFar } else { typeSoFar match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 57e36e082653..40273251c378 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -105,6 +105,20 @@ class CSVInferSchemaSuite extends SparkFunSuite { assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) } + test("Empty fields are handled properly when an emptyValue is specified") { + var options = new CSVOptions(Map("emptyValue" -> "empty"), false, "GMT") + assert(CSVInferSchema.inferField(NullType, "empty", options) == NullType) + assert(CSVInferSchema.inferField(StringType, "empty", options) == StringType) + assert(CSVInferSchema.inferField(LongType, "empty", options) == LongType) + + options = new CSVOptions(Map("emptyValue" -> "\\N"), false, "GMT") + assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) + assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) + assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) + assert(CSVInferSchema.inferField(BooleanType, "\\N", options) == BooleanType) + assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) + } + test("Merging Nulltypes should yield Nulltype.") { val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType)) assert(mergedNullTypes.deep == Array(NullType).deep) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9a375f6657b2..2b39a0b1f52e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1440,6 +1440,52 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } + test("SPARK-25241: An empty string should not be coerced to null when emptyValue is passed.") { + val litNull: String = null + val df = Seq( + (1, "John Doe"), + (2, ""), + (3, "-"), + (4, litNull) + ).toDF("id", "name") + + // Checks for new behavior where a null is not coerced to an empty string when `emptyValue` is + // set to anything but an empty string literal. + withTempPath { path => + df.write + .option("emptyValue", "-") + .csv(path.getAbsolutePath) + val computed = spark.read + .option("emptyValue", "-") + .schema(df.schema) + .csv(path.getAbsolutePath) + val expected = Seq( + (1, "John Doe"), + (2, "-"), + (3, "-"), + (4, "-") + ).toDF("id", "name") + + checkAnswer(computed, expected) + } + // Keeps the old behavior where empty string us coerced to emptyValue is not passed. + withTempPath { path => + df.write + .csv(path.getAbsolutePath) + val computed = spark.read + .schema(df.schema) + .csv(path.getAbsolutePath) + val expected = Seq( + (1, "John Doe"), + (2, litNull), + (3, "-"), + (4, litNull) + ).toDF("id", "name") + + checkAnswer(computed, expected) + } + } + test("SPARK-24329: skip lines with comments, and one or multiple whitespaces") { val schema = new StructType().add("colA", StringType) val ds = spark From 8e91d5dbc7d91ca9e6439c24248c1d79f06e4b4d Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sun, 26 Aug 2018 18:33:55 -0500 Subject: [PATCH 3/9] Changing emptyValue order arg in streaming.py --- python/pyspark/sql/streaming.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e3fbc9fa2f03..c61e02d7b109 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -560,11 +560,11 @@ def text(self, path, wholetext=False, lineSep=None): @since(2.0) def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None, - positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, - maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, + ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - enforceSchema=None): + enforceSchema=None, emptyValue=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -611,8 +611,6 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. - :param emptyValue: sets the string representation of an empty value. If None is set, it uses - the default value, empty string. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None @@ -660,6 +658,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, empty string. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming From ddbac3e995dc613fc9ef854a3da05648105c208b Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sun, 26 Aug 2018 21:03:41 -0500 Subject: [PATCH 4/9] Changing emptyValue order arg in set_opts --- python/pyspark/sql/readwriter.py | 14 +++++++------- python/pyspark/sql/streaming.py | 9 +++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 8f50cd17a18d..965cba914cf8 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -459,13 +459,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, - emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf, - negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, - maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, - enforceSchema=enforceSchema) + enforceSchema=enforceSchema, emptyValue=emptyValue) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -920,12 +920,12 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, - nullValue=nullValue, emptyValue=emptyValue, escapeQuotes=escapeQuotes, - quoteAll=quoteAll, dateFormat=dateFormat, timestampFormat=timestampFormat, + nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, + dateFormat=dateFormat, timestampFormat=timestampFormat, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, - encoding=encoding) + encoding=encoding, emptyValue=emptyValue) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index c61e02d7b109..522900bf6684 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -671,12 +671,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, - emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf, - negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, - maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, - charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema) + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, + emptyValue=emptyValue) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: From 4cb2be7d09cf02c14ae010402620c025d389a5b9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Sep 2018 22:40:41 +0200 Subject: [PATCH 5/9] Added comments for parameters --- .../execution/datasources/csv/CSVOptions.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index f84f783604e9..492a21be6df3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -117,9 +117,6 @@ class CSVOptions( val nullValue = parameters.getOrElse("nullValue", "") - val emptyValueInRead = parameters.getOrElse("emptyValue", "") - val emptyValueInWrite = parameters.getOrElse("emptyValue", "\"\"") - val nanValue = parameters.getOrElse("nanValue", "NaN") val positiveInf = parameters.getOrElse("positiveInf", "Inf") @@ -165,6 +162,21 @@ class CSVOptions( */ val enforceSchema = getBool("enforceSchema", default = true) + + /** + * String representation of an empty value in read and in write. + */ + val emptyValue = parameters.get("emptyValue") + /** + * The string is returned when CSV reader doesn't have any characters for input value, + * or an empty quoted string `""`. Default value is empty string. + */ + val emptyValueInRead = emptyValue.getOrElse("") + /** + * The value is used instead of an empty string in write. Default value is `""` + */ + val emptyValueInWrite = emptyValue.getOrElse("\"\"") + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat From 8385c1149037cdc209a326b6945a1fdb1e0e64aa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Sep 2018 23:02:21 +0200 Subject: [PATCH 6/9] Updating the migration guide --- docs/sql-programming-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 374909456927..8b137b6666e2 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. + - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3 and earlier, empty strings were equal to `null` values and didn't reflect to any characters in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to empty (not quoted) string. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above From a89bc673a2f50147a52a38d00075687529246651 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sun, 26 Aug 2018 18:28:34 -0500 Subject: [PATCH 7/9] Changing order in args for emptyValue --- python/pyspark/sql/readwriter.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 965cba914cf8..3ca5d548ae7d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -345,11 +345,11 @@ def text(self, paths, wholetext=False, lineSep=None): @since(2.0) def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None, - positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, - maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, - mode=None, columnNameOfCorruptRecord=None, multiLine=None, - charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None): + ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, + columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, + samplingRatio=None, enforceSchema=None, emptyValue=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -395,8 +395,6 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. - :param emptyValue: sets the string representation of an empty value. If None is set, it uses - the default value, empty string. :param nanValue: sets the string representation of a non-number value. If None is set, it uses the default value, ``NaN``. :param positiveInf: sets the string representation of a positive infinity value. If None @@ -446,6 +444,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non different, ``\0`` otherwise. :param samplingRatio: defines fraction of rows used for schema inferring. If None is set, it uses the default value, ``1.0``. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, empty string. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes @@ -859,9 +859,9 @@ def text(self, path, compression=None, lineSep=None): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, - header=None, nullValue=None, emptyValue=None, escapeQuotes=None, quoteAll=None, - dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None): + header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, + timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, + charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -893,8 +893,6 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. - :param emptyValue: sets the string representation of an empty value. If None is set, it uses - the default value, ``""``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the @@ -915,6 +913,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No different, ``\0`` otherwise.. :param encoding: sets the encoding (charset) of saved csv files. If None is set, the default UTF-8 charset will be used. + :param emptyValue: sets the string representation of an empty value. If None is set, it uses + the default value, ``""``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ From 75208a478d4ee8ecd1ed94ff6281b437c8c233c1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Sep 2018 22:25:34 +0200 Subject: [PATCH 8/9] Revert "Adding tests" This reverts commit 48e143d43a876afc4f0099bf7079130d74ebe855. --- .../execution/datasources/csv/CSVDataSource.scala | 7 +++---- .../execution/datasources/csv/CSVInferSchema.scala | 3 +-- .../datasources/csv/CSVInferSchemaSuite.scala | 14 -------------- 3 files changed, 4 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 5f427239ced1..2b86054c0ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -91,10 +91,9 @@ abstract class CSVDataSource extends Serializable { } row.zipWithIndex.map { case (value, index) => - if (value == null || value.isEmpty || value == options.nullValue || - value == options.emptyValueInRead) { - // When there are empty strings or the values set in `nullValue` or in `emptyValue`, - // put the index as the suffix. + if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. s"_c$index" } else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { // When there are case-insensitive duplicates, put the index as the suffix. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index e7743b07f866..a585cbed2551 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -79,8 +79,7 @@ private[csv] object CSVInferSchema { * point checking if it is an Int, as the final type must be Double or higher. */ def inferField(typeSoFar: DataType, field: String, options: CSVOptions): DataType = { - if (field == null || field.isEmpty || field == options.nullValue || - field == options.emptyValueInRead) { + if (field == null || field.isEmpty || field == options.nullValue) { typeSoFar } else { typeSoFar match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 40273251c378..57e36e082653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -105,20 +105,6 @@ class CSVInferSchemaSuite extends SparkFunSuite { assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) } - test("Empty fields are handled properly when an emptyValue is specified") { - var options = new CSVOptions(Map("emptyValue" -> "empty"), false, "GMT") - assert(CSVInferSchema.inferField(NullType, "empty", options) == NullType) - assert(CSVInferSchema.inferField(StringType, "empty", options) == StringType) - assert(CSVInferSchema.inferField(LongType, "empty", options) == LongType) - - options = new CSVOptions(Map("emptyValue" -> "\\N"), false, "GMT") - assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) - assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) - assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) - assert(CSVInferSchema.inferField(BooleanType, "\\N", options) == BooleanType) - assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) - } - test("Merging Nulltypes should yield Nulltype.") { val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType)) assert(mergedNullTypes.deep == Array(NullType).deep) From 9a04d87fb4fc52490dafe5dcd0dea85c4f1b3c5d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Sep 2018 22:30:47 +0200 Subject: [PATCH 9/9] Addressing Hyukjin Kwon's concerns --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8b137b6666e2..9da7d64322eb 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1897,7 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. - - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3 and earlier, empty strings were equal to `null` values and didn't reflect to any characters in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to empty (not quoted) string. + - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3 and earlier, empty strings are equal to `null` values and do not reflect to any characters in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to empty (not quoted) string. ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above