Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ def text(self, paths):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
maxMalformedLogPerPartition=None, mode=None):
negativeInf=None, dateFormat=None, timezone=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -328,6 +328,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
applies to both date type and timestamp type. By default, it is None
which means trying to parse times and date by
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
:param timezone: defines the timezone to be used for both date type and timestamp type.
If a timezone is specified in the data, this will load them after
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get what this is saying -- if an input string specifies a time zone then that is the time zone to use to parse it. That's it right?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will use the timezone specified in the input. Since Date and Timestamp do not keep timezone information, it calculates the differences between specified timezone in the input and in the option but the standard is the one specified in the option.

I meant to say, for example, if timezone is set to GMT, all the read Date and Timestamp are in GMT timezone after calculating the differences. So..

If the CSV data is as below:

27/08/2015 00:00 PDT
27/08/2015 01:00 PDT
27/08/2015 02:00 PDT

If this is read as below:

val df = spark.read
  .format("csv")
  .option("timezone", "GTM")
  .option("dateFormat", "dd/MM/yyyy HH:mm z")
  .load("path")
df.show()

it will become as below in dataframe (difference between GMT and PDT is 7 hours), these below will be timestamp objects but I just represented them as strings.

+-----------------------+
|                     _1|
+-----------------------+
|2015-08-27 07:00:00.000|
|2015-08-27 08:00:00.000|
|2015-08-27 09:00:00.000|
+-----------------------+

EDITTED: - I added an example for writing as well just to be clear

Then, if you write this as below:

df.write
  .format("csv")
  .option("header", "true")
  .option("timezone", "GTM")
  .option("dateFormat", "dd/MM/yyyy HH:mm z")
  .save("path")

the results will be

27/08/2015 07:00 GMT
27/08/2015 08:00 GMT
27/08/2015 09:00 GMT

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this behaviour looks fine, I will just change the documentation to be more clear..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the example above, the input specifies a timezone and that must be used to interpret it. You say "it becomes in the dataframe" but what it becomes is a timestamp, which is unambiguous and has no timezone. Timezone matters when converting back to a string for display, but, your example only shows the parameter used on reading, and shows no timezone in the output. I am not sure that this is the intended behavior?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it loses the timezone information after being loaded into Spark. I mean, Timestamp and Date instances don't have timezone information in them. The timezone specified in the input is being used in the example...

I am sorry that I think I didn't understand cleanly. Do you mind if I ask what you expect in before being read, after being read (in dataframe) and after being written please?

(I just added a example.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you for the detailed explanation. I should correct comments and the behaviour.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it is a default behaviour for SimpleDateFormat. I will look into this deeper and will fix or leave some more commemts tomorrow. Thanks again!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have timezone as part of the dateFormat, so users can specify it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ic - you want to control the zone this gets converted to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will clean up the PR description and all those soon with a better proposal. Thanks!

calculating the time difference between both. If None is set, it uses
the timezone of your current system.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
Expand All @@ -354,7 +358,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
dateFormat=dateFormat, timezone=timezone, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
Expand Down Expand Up @@ -631,7 +636,7 @@ def text(self, path, compression=None):

@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None):
header=None, dateFormat=None, timezone=None, nullValue=None, escapeQuotes=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -658,14 +663,22 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
``true``, escaping all values containing a quote character.
:param header: writes the names of columns as the first line. If None is set, it uses
the default value, ``false``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This applies to
both date type and timestamp type. By default, it is None which means
writing both as numeric timestamps.
:param timezone: defines the timezone to be used with ``dateFormat`` option. If a timezone
is specified in ``dateFormat`` (e.g. ``Z``), then it will write the
appropriate value with this timezone.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes)
nullValue=nullValue, escapeQuotes=escapeQuotes, dateFormat=dateFormat,
timezone=timezone)
self._jwrite.csv(path)

@since(1.5)
Expand Down Expand Up @@ -949,8 +962,8 @@ def text(self, path):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
maxMalformedLogPerPartition=None, mode=None):
negativeInf=None, dateFormat=None, timezone=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -996,6 +1009,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
applies to both date type and timestamp type. By default, it is None
which means trying to parse times and date by
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
:param timezone: defines the timezone to be used for both date type and timestamp type.
If a timezone is specified in the data, this will load them after
calculating the time difference between both. If None is set, it uses
the timezone of your current system.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
Expand All @@ -1021,7 +1038,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
dateFormat=dateFormat, timezone=timezone, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
* and timestamp type. By default, it is `null` which means trying to parse times and date by
* `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
* <li>`timezone` (default is the timezone of your current system): defines the timezone to
* be used for both date type and timestamp type. If a timezone is specified in the data, this
* will load them after calculating the time difference between both.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* quotes should always be enclosed in quotes. Default is to escape all values containing
* a quote character.</li>
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
* <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
* formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
* and timestamp type. By default, it is `null` which means writing both as numeric
* timestamps.</li>
* <li>`timezone` (default is the timezone of your current system): defines the timezone to
* be used with `dateFormat` option. If a timezone is specified in `dateFormat` (e.g. `Z`),
* then it will write the appropriate value with this timezone.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv

import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util.TimeZone

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
Expand Down Expand Up @@ -101,10 +102,16 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
name.map(CompressionCodecs.getCodecClassName)
}

private val timezone: Option[String] = parameters.get("timezone")

// Share date format object as it is expensive to parse date pattern.
val dateFormat: SimpleDateFormat = {
val dateFormat = parameters.get("dateFormat")
dateFormat.map(new SimpleDateFormat(_)).orNull
dateFormat.map { f =>
val format = new SimpleDateFormat(f)
timezone.foreach(tz => format.setTimeZone(TimeZone.getTimeZone(tz)))
format
}.orNull
}

val maxColumns = getInt("maxColumns", 20480)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -179,6 +180,12 @@ private[sql] class CsvOutputWriter(
// create the Generator without separator inserted between 2 records
private[this] val text = new Text()

// A `ValueConverter` is responsible for converting a field of an `InternalRow` to `String`.
private type ValueConverter = (InternalRow, Int) => String

// `ValueConverter`s for all fields of the schema
private val fieldsConverters: Seq[ValueConverter] = dataSchema.map(_.dataType).map(makeConverter)

private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
Expand All @@ -195,18 +202,40 @@ private[sql] class CsvOutputWriter(
private var records: Long = 0L
private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)

private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
if (field != null) {
field.toString
} else {
params.nullValue
private def rowToString(row: InternalRow): Seq[String] = {
var i = 0
val values = new Array[String](row.numFields)
while (i < row.numFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use values.indices and then map or foreach to make it more functional (and hopefully readable). With the change, you'll link values to indices (not relying on row.numFields used twice).

BTW, do we need values to be initialized first before adding elements? I'd vote for foldLeft as a better alternative, e.g.

def appendRowValue(arr: Array[String], i: Int) = {
  // do the if here
  arr
}
(0 to row.numFields).foldLeft(Array[String]()) { case (arr, i) => arr }

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using funtional transformation is generally slower due to virtual function calls. This part is executed a lot and such overhead will become really heavy. See https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex

if (!row.isNullAt(i)) {
values(i) = fieldsConverters(i).apply(row, i)
} else {
values(i) = params.nullValue
}
i += 1
}
values
}

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType if params.dateFormat != null =>
(row: InternalRow, ordinal: Int) =>
params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))

case TimestampType if params.dateFormat != null =>
(row: InternalRow, ordinal: Int) =>
params.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Maybe you are looking for this case. I had to do it like this to avoid per-record type dispatch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is formatting to a string, rather than parsing from a string right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. This is not the part of this PR. This is

case _: TimestampType if options.dateFormat != null =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
options.dateFormat.parse(datum).getTime * 1000L
case _: TimestampType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
DateTimeUtils.stringToTime(datum).getTime * 1000L
case _: DateType if options.dateFormat != null =>
DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
case _: DateType =>
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was merged before, here but timezone was not concerned.

So, this PR adds the support for..

  • timezone for both reading and writing
  • dateFormat for writing.


case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

case dt: DataType =>
(row: InternalRow, ordinal: Int) =>
row.get(ordinal, dt).toString
}

override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")

override protected[sql] def writeInternal(row: InternalRow): Unit = {
csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag)
records += 1
if (records % FLUSH_BATCH_SIZE == 0) {
flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,4 +665,121 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

verifyCars(cars, withHeader = true, checkValues = false)
}

test("Write timestamps correctly with dateFormat and timezone option") {
withTempDir { dir =>
// With dateFormat option.
val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
val datesWithFormat = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("dateFormat", "dd/MM/yyyy HH:mm")
.load(testFile(datesFile))
datesWithFormat.write
.format("csv")
.option("header", "true")
.option("dateFormat", "yyyy/MM/dd HH:mm")
.save(datesWithFormatPath)

// This will load back the timestamps as string.
val stringDatesWithFormat = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "false")
.load(datesWithFormatPath)
val expectedStringDatesWithFormat = Seq(
Row("2015/08/26 18:00"),
Row("2014/10/27 18:30"),
Row("2016/01/28 20:00"))

checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)

// With dateFormat and timezone option.
val datesWithZoneAndFormatPath = s"${dir.getCanonicalPath}/datesWithZoneAndFormat.csv"
val datesWithZoneAndFormat = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("timezone", "GMT")
.option("dateFormat", "dd/MM/yyyy HH:mm")
.load(testFile(datesFile))
datesWithZoneAndFormat.write
.format("csv")
.option("header", "true")
.option("timezone", "Asia/Seoul")
.option("dateFormat", "dd/MM/yyyy HH:mmZ")
.save(datesWithZoneAndFormatPath)

// This will load back the timestamps as string.
val stringDates = spark.read
.format("csv")
.option("header", "true")
.load(datesWithZoneAndFormatPath)
val expectedStringDates = Seq(
Row("27/08/2015 03:00+0900"),
Row("28/10/2014 03:30+0900"),
Row("29/01/2016 05:00+0900"))

checkAnswer(stringDates, expectedStringDates)
}
}

test("Write dates correctly with dateFormat and timezone option") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
withTempDir { dir =>
// With dateFormat option.
val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
val datesWithFormat = spark.read
.format("csv")
.schema(customSchema)
.option("header", "true")
.option("dateFormat", "dd/MM/yyyy HH:mm")
.load(testFile(datesFile))
datesWithFormat.write
.format("csv")
.option("header", "true")
.option("dateFormat", "yyyy/MM/dd")
.save(datesWithFormatPath)

// This will load back the dates as string.
val stringDatesWithFormat = spark.read
.format("csv")
.option("header", "true")
.load(datesWithFormatPath)
val expectedStringDatesWithFormat = Seq(
Row("2015/08/26"),
Row("2014/10/27"),
Row("2016/01/28"))

checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)

// With dateFormat and timezone option.
val datesWithZoneAndFormatPath = s"${dir.getCanonicalPath}/datesWithZoneAndFormat.csv"
val datesWithZoneAndFormat = spark.read
.format("csv")
.schema(customSchema)
.option("header", "true")
.option("dateFormat", "dd/MM/yyyy HH:mm")
.load(testFile(datesFile))
datesWithZoneAndFormat.write
.format("csv")
.option("header", "true")
.option("timezone", "GTM")
.option("dateFormat", "dd/MM/yyyy z")
.save(datesWithZoneAndFormatPath)

// This will load back the dates as string.
val stringDates = spark.read
.format("csv")
.option("header", "true")
.load(datesWithZoneAndFormatPath)
val expectedStringDates = Seq(
Row("26/08/2015 GMT"),
Row("27/10/2014 GMT"),
Row("28/01/2016 GMT"))

checkAnswer(stringDates, expectedStringDates)
}
}
}