-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19018][SQL] Add support for custom encoding on csv writer #20949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
| val df = spark | ||
| .read | ||
| .option("header", "false") | ||
| .option("encoding", encoding) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our CSV read encoding option is incomplete for now .. there are many discussions about this now. I am going to fix the read path soon. Let me revisit this after fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it's fine. I think we decided to support encoding in CSV/JSON datasources. Ignore the comment above. We can proceed separately.
|
Test build #88779 has finished for PR 20949 at commit
|
|
Test build #90274 has finished for PR 20949 at commit
|
|
Test build #90275 has finished for PR 20949 at commit
|
|
I've been giving a look to this PR (I've hit this problem in the past and had a chat with @crafty-coder about it and his fixes, too), is there anything we could do to move it forward? Also, is there any way to trigger a rebuild on Jenkins without adding a dummy commit? Looks like the JVM on this test run blew the heap, just a re-run should be enough (cc @holdenk @HyukjinKwon ) |
|
I would say this change has value on its own. At the moment the csv reader applies the charset config but the csv writer is ignoring it, which I think its a bit confusing. |
|
retest this please |
|
Test build #90279 has finished for PR 20949 at commit
|
| } | ||
|
|
||
| test("Save csv with custom charset") { | ||
| Seq("iso-8859-1", "utf-8", "windows-1250").foreach { encoding => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check the UTF-16 and UTF-32 encoding too. The written csv files must contain BOMs for such encodings. I am not sure that Spark CSV datasource is able to read it in per-line mode (multiLine is set to false). Probably, you need to switch to multLine mode or read the files by Scala's library like in JsonSuite:
spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Lines 2322 to 2338 in c7e2742
| test("SPARK-23723: write json in UTF-16/32 with multiline off") { | |
| Seq("UTF-16", "UTF-32").foreach { encoding => | |
| withTempPath { path => | |
| val ds = spark.createDataset(Seq(("a", 1))).repartition(1) | |
| ds.write | |
| .option("encoding", encoding) | |
| .option("multiline", false) | |
| .json(path.getCanonicalPath) | |
| val jsonFiles = path.listFiles().filter(_.getName.endsWith("json")) | |
| jsonFiles.foreach { jsonFile => | |
| val readback = Files.readAllBytes(jsonFile.toPath) | |
| val expected = ("""{"_1":"a","_2":1}""" + "\n").getBytes(Charset.forName(encoding)) | |
| assert(readback === expected) | |
| } | |
| } | |
| } | |
| } |
| val originalDF = Seq("µß áâä ÁÂÄ").toDF("_c0") | ||
| // scalastyle:on | ||
| originalDF.write | ||
| .option("header", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header flag is disabled by default. Just in case, are there any specific reasons fro testing without CSV header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, there is no reason. It's fixed on the next commit.
python/pyspark/sql/readwriter.py
Outdated
| the quote character. If None is set, the default value is | ||
| escape character when escape and quote characters are | ||
| different, ``\0`` otherwise.. | ||
| :param encoding: sets encoding used for encoding the file. If None is set, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you reformulate this encoding used for encoding
| context, | ||
| new Path(path), | ||
| charset | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the ) up like charset). See https://github.com/databricks/scala-style-guide
| Seq("iso-8859-1", "utf-8", "windows-1250").foreach { encoding => | ||
| withTempDir { dir => | ||
| val csvDir = new File(dir, "csv").getCanonicalPath | ||
| // scalastyle:off |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's ignore the specific rule for this, e.g.:
// scalastyle:off nonascii
...
// scalastyle:on nonascii
|
ok to test |
|
Test build #92277 has finished for PR 20949 at commit
|
|
Test build #93113 has finished for PR 20949 at commit
|
| } | ||
| } | ||
|
|
||
| test("Save csv with custom charset") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you prepend SPARK-19018 to the test title.
|
Test build #93164 has finished for PR 20949 at commit
|
| * enclosed in quotes. Default is to only escape values containing a quote character.</li> | ||
| * <li>`header` (default `false`): writes the names of columns as the first line.</li> | ||
| * <li>`nullValue` (default empty string): sets the string representation of a null value.</li> | ||
| * <li>`encoding` (default `UTF-8`): encoding to use when saving to file.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should match the doc with JSON's
spark/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Lines 525 to 526 in 6ea582e
| * <li>`encoding` (by default it is not set): specifies encoding (charset) of saved json | |
| * files. If it is not set, the UTF-8 charset will be used. </li> |
| private val charset = Charset.forName(params.charset) | ||
|
|
||
| private val writer = CodecStreams.createOutputStreamWriter( | ||
| context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit:
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)| .option("encoding", encoding) | ||
| .csv(csvDir.getCanonicalPath) | ||
|
|
||
| csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h({ => h {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean?
|
|
||
| csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => | ||
| val readback = Files.readAllBytes(csvFile.toPath) | ||
| val expected = (content + "\n").getBytes(Charset.forName(encoding)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the newline is dependent on Univocity. This test is going to be broken on Windows. Let's use platform's newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Point!
|
|
||
| test("SPARK-19018: error handling for unsupported charsets") { | ||
| val exception = intercept[SparkException] { | ||
| withTempDir { dir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTempPath
| // scalastyle:on nonascii | ||
|
|
||
| Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding => | ||
| withTempDir { dir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTempDir -> withTempPath
| withTempDir { dir => | ||
| val csvDir = new File(dir, "csv") | ||
|
|
||
| val originalDF = Seq(content).toDF("_c0").repartition(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toDF("_c0") -> toDF()
python/pyspark/sql/readwriter.py
Outdated
| escape character when escape and quote characters are | ||
| different, ``\0`` otherwise.. | ||
| :param encoding: sets the encoding (charset) to be used on the csv file. If None is set, it | ||
| uses the default value, ``UTF-8``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, let's match the doc to JSON's.
- Improve method documentation - Inline method calls that are not too big - Use platform newline instead of hardcoded one. - Replace withTempDir with withTempPath
|
Test build #93227 has finished for PR 20949 at commit
|
|
Test build #93226 has finished for PR 20949 at commit
|
|
Test build #93229 has finished for PR 20949 at commit
|
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too
|
retest this please |
| .option("encoding", encoding) | ||
| .csv(csvDir.getCanonicalPath) | ||
|
|
||
| csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .foreach({ -> .foreach { per https://github.com/databricks/scala-style-guide#anonymous-methods
| val csvDir = new File(path, "csv").getCanonicalPath | ||
| Seq("a,A,c,A,b,B").toDF().write | ||
| .option("encoding", "1-9588-osi") | ||
| .csv(csvDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could use directly path.getCanonicalPath
| Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding => | ||
| withTempPath { path => | ||
| val csvDir = new File(path, "csv") | ||
| Seq(content).toDF().write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .write.repartition(1) to make sure we write only one file
|
Test build #93531 has finished for PR 20949 at commit
|
|
Merged to master. |
|
@crafty-coder, what's your JIRA ID? I should know it to assign the JIRA to you. |
|
@HyukjinKwon and @MaxGekk thanks for your help in this PR! My JIRA Id is also crafty-coder |
What changes were proposed in this pull request?
Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018
How was this patch tested?
Added two unit tests in CSVSuite