From a794988407b6fd28364f5d993a6a52ac0b85ec5f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Feb 2018 21:11:00 +0100 Subject: [PATCH 01/21] Adding the delimiter option encoded in base64 --- .../apache/spark/sql/catalyst/json/JSONOptions.scala | 11 +++++++++++ .../execution/datasources/HadoopFileLinesReader.scala | 10 ++++++++-- .../execution/datasources/json/JsonDataSource.scala | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 652412b34478..08b41bbfa4c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.json import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging @@ -85,6 +86,16 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. + */ + val charset: Option[String] = parameters.get("charset") + + val delimiter: Option[Array[Byte]] = { + parameters.get("delimiter").map(Base64.decodeBase64(_)) + } + /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 83cf26c63a17..6835aaba2761 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl * in that file. */ class HadoopFileLinesReader( - file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { + file: PartitionedFile, + conf: Configuration, + recordDelimiter: Option[Array[Byte]] = None + ) extends Iterator[Text] with Closeable { private val iterator = { val fileSplit = new FileSplit( new Path(new URI(file.filePath)), @@ -42,7 +45,10 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = new LineRecordReader() + val reader = recordDelimiter match { + case Some(delimiter) => new LineRecordReader(delimiter) + case _ => new LineRecordReader() + } reader.initialize(fileSplit, hadoopAttemptContext) new RecordReaderIterator(reader) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 77e7edc8e7a2..c8efaf089e23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -120,7 +120,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf) + val linesReader = new HadoopFileLinesReader(file, conf, parser.options.delimiter) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), From dccdaa2e97cb4e2f6f8ea7e03320cdb05a43668c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Feb 2018 21:59:46 +0100 Subject: [PATCH 02/21] Separator encoded as a sequence of bytes in hex --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 9 ++++++--- .../sql/execution/datasources/json/JsonDataSource.scala | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 08b41bbfa4c2..977482dfa5c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -92,9 +92,12 @@ private[sql] class JSONOptions( */ val charset: Option[String] = parameters.get("charset") - val delimiter: Option[Array[Byte]] = { - parameters.get("delimiter").map(Base64.decodeBase64(_)) - } + // A separator of json records + val sep: Option[Array[Byte]] = parameters.get("sep").map( + _.replaceAll("[^0-9A-Fa-f]", "") + .sliding(2, 2) + .toArray.map(Integer.parseInt(_, 16).toByte) + ) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index c8efaf089e23..ff838972266a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -120,7 +120,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf, parser.options.delimiter) + val linesReader = new HadoopFileLinesReader(file, conf,parser.options.sep) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), From d0abab7e4b74dd42e06972f9484bc712b8f11c63 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Feb 2018 22:06:08 +0100 Subject: [PATCH 03/21] Refactoring: removed unused imports and renaming a parameter --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 1 - .../sql/execution/datasources/HadoopFileLinesReader.scala | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 977482dfa5c9..37ed598d9ec7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.json import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 6835aaba2761..9253388e5e8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl class HadoopFileLinesReader( file: PartitionedFile, conf: Configuration, - recordDelimiter: Option[Array[Byte]] = None + lineSeparator: Option[Array[Byte]] = None ) extends Iterator[Text] with Closeable { private val iterator = { val fileSplit = new FileSplit( @@ -45,8 +45,8 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = recordDelimiter match { - case Some(delimiter) => new LineRecordReader(delimiter) + val reader = lineSeparator match { + case Some(sep) => new LineRecordReader(sep) case _ => new LineRecordReader() } reader.initialize(fileSplit, hadoopAttemptContext) From 674179601b4c82e315eb1156df0f3f5035e91154 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Mar 2018 18:24:42 +0100 Subject: [PATCH 04/21] The sep option is renamed to recordSeparator. The supported format is sequence of bytes in hex like x0d 0a --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 7 +++++-- .../sql/execution/datasources/json/JsonDataSource.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 37ed598d9ec7..d59c5c1e9253 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -91,8 +91,11 @@ private[sql] class JSONOptions( */ val charset: Option[String] = parameters.get("charset") - // A separator of json records - val sep: Option[Array[Byte]] = parameters.get("sep").map( + /** + * A sequence of bytes between two consecutive json records. Supported formats: + * - sequence of bytes in hex format (starts from x): x0a 0d + */ + val recordSeparator: Option[Array[Byte]] = parameters.get("recordSeparator").map( _.replaceAll("[^0-9A-Fa-f]", "") .sliding(2, 2) .toArray.map(Integer.parseInt(_, 16).toByte) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index ff838972266a..e003af4dcfac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -120,7 +120,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf,parser.options.sep) + val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordSeparator) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), From e4faae155cb5b0761da9ac72a12f67cdde6b2e6b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 13:40:21 +0100 Subject: [PATCH 05/21] Renaming recordSeparator to recordDelimiter --- .../scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index d59c5c1e9253..f9f6c6dccbd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -95,7 +95,7 @@ private[sql] class JSONOptions( * A sequence of bytes between two consecutive json records. Supported formats: * - sequence of bytes in hex format (starts from x): x0a 0d */ - val recordSeparator: Option[Array[Byte]] = parameters.get("recordSeparator").map( + val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").map( _.replaceAll("[^0-9A-Fa-f]", "") .sliding(2, 2) .toArray.map(Integer.parseInt(_, 16).toByte) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index e003af4dcfac..4a6041df7dc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -120,7 +120,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordSeparator) + val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordDelimiter) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), From 01f4ef584a2cc1ce460359f260ebbe22808d034e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 14:17:59 +0100 Subject: [PATCH 06/21] Comments for the recordDelimiter option --- .../spark/sql/catalyst/json/JSONOptions.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index f9f6c6dccbd5..5c4dcecdfc15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -92,14 +92,20 @@ private[sql] class JSONOptions( val charset: Option[String] = parameters.get("charset") /** - * A sequence of bytes between two consecutive json records. Supported formats: - * - sequence of bytes in hex format (starts from x): x0a 0d + * A sequence of bytes between two consecutive json records. Format of the option is: + * selector (1 char) + delimiter body (any length) + * The following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * - '/' - reserved for a sequence of visible chars like "/===" */ - val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").map( - _.replaceAll("[^0-9A-Fa-f]", "") - .sliding(2, 2) - .toArray.map(Integer.parseInt(_, 16).toByte) - ) + val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case d => throw new NotImplementedError(d) + } /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { From 24cedb9d809b026fa36b01fb2b425918b43857df Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 15:36:31 +0100 Subject: [PATCH 07/21] Support other formats of recordDelimiter --- .../spark/sql/catalyst/json/JSONOptions.scala | 5 ++-- .../datasources/json/JsonSuite.scala | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5c4dcecdfc15..770225ec0acf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -98,13 +98,14 @@ private[sql] class JSONOptions( * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". * Hex pairs can be separated by any chars different from 0-9,A-F,a-f * - '\' - reserved for a sequence of control chars like "\r\n" - * - '/' - reserved for a sequence of visible chars like "/===" + * and unicode escape like "\u000D\u000A" */ val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect { case hexs if hexs.startsWith("x") => hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray .map(Integer.parseInt(_, 16).toByte) - case d => throw new NotImplementedError(d) + case delim => delim.getBytes(charset.getOrElse( + throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) } /** Sets config options on a Jackson [[JsonFactory]]. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8c8d41ebf115..911a68e392e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2063,4 +2063,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } + + def readWrittenJson(delimiter: (String, Int)): Unit = { + val (recordDelimiter, index) = delimiter + test(s"read written json in UTF-16BE with delimiter $index") { + val charset = "UTF-16BE" + case class Rec(f1: String, f2: Int) + withTempPath { path => + val ds = spark.createDataset(Seq( + ("a", 1), ("b", 2), ("c", 3)) + ).repartition(1) + ds.write + .option("charset", charset) + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + val savedDf = spark + .read + .schema(ds.schema) + .option("charset", charset) + .option("recordDelimiter", recordDelimiter) + .json(path.getCanonicalPath) + + checkAnswer(savedDf.toDF(), ds.toDF()) + } + } + } + + List("x00 0a", "\n", "\u000a").zipWithIndex.foreach(readWrittenJson(_)) } From d40dda22587deaf79cfad3b20ccf6854554fc11d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 17:30:26 +0100 Subject: [PATCH 08/21] Checking different charsets and record delimiters --- .../execution/datasources/json/JsonSuite.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 911a68e392e4..4019026f5ac3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2064,10 +2064,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def readWrittenJson(delimiter: (String, Int)): Unit = { - val (recordDelimiter, index) = delimiter - test(s"read written json in UTF-16BE with delimiter $index") { - val charset = "UTF-16BE" + def readSparkJson(charset: String, delimiter: String, runId: Int): Unit = { + test(s"checks Spark is able to read json written by Spark itself #{$runId}") { case class Rec(f1: String, f2: Int) withTempPath { path => val ds = spark.createDataset(Seq( @@ -2081,7 +2079,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .read .schema(ds.schema) .option("charset", charset) - .option("recordDelimiter", recordDelimiter) + .option("recordDelimiter", delimiter) .json(path.getCanonicalPath) checkAnswer(savedDf.toDF(), ds.toDF()) @@ -2089,5 +2087,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - List("x00 0a", "\n", "\u000a").zipWithIndex.foreach(readWrittenJson(_)) + List( + ("\n", "UTF-8"), + ("x00 0a", "UTF-16BE"), + ("\n", "UTF-16LE"), + ("\u000a", "UTF-32BE"), + ("x0a 00 00 00", "UTF-32LE") + ).zipWithIndex.foreach{case ((d, c), i) => readSparkJson(c, d, i)} } From ad6496c6d9415bcd49630272b5d6c327ffcb1378 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 17:39:07 +0100 Subject: [PATCH 09/21] Renaming test's method to make it more readable --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4019026f5ac3..19bbda496b75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2063,8 +2063,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } - - def readSparkJson(charset: String, delimiter: String, runId: Int): Unit = { + + def checkReadWrittenJson(charset: String, delimiter: String, runId: Int): Unit = { test(s"checks Spark is able to read json written by Spark itself #{$runId}") { case class Rec(f1: String, f2: Int) withTempPath { path => @@ -2093,5 +2093,5 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("\n", "UTF-16LE"), ("\u000a", "UTF-32BE"), ("x0a 00 00 00", "UTF-32LE") - ).zipWithIndex.foreach{case ((d, c), i) => readSparkJson(c, d, i)} + ).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)} } From 358863d91bf0c0d9761aa13698eb7f8532e5fc90 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 18:20:38 +0100 Subject: [PATCH 10/21] Test of reading json in different charsets and delimiters --- .../datasources/json/JsonSuite.scala | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 19bbda496b75..9031ad10cb75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.json -import java.io.{File, StringWriter} +import java.io.{File, FileOutputStream, StringWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util.Locale @@ -2065,8 +2065,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } def checkReadWrittenJson(charset: String, delimiter: String, runId: Int): Unit = { - test(s"checks Spark is able to read json written by Spark itself #{$runId}") { - case class Rec(f1: String, f2: Int) + test(s"checks Spark is able to read json written by Spark itself #${runId}") { withTempPath { path => val ds = spark.createDataset(Seq( ("a", 1), ("b", 2), ("c", 3)) @@ -2094,4 +2093,46 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("\u000a", "UTF-32BE"), ("x0a 00 00 00", "UTF-32LE") ).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)} + + def checkReadJson(charset: String, delimiter: String, runId: Int): Unit = { + test(s"checks reading json in ${charset} #${runId}") { + val delimInBytes = { + if (delimiter.startsWith("x")) { + delimiter.replaceAll("[^0-9A-Fa-f]", "") + .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } else { + delimiter.getBytes(charset) + } + } + case class Rec(f1: String, f2: Int) { + def json = s"""{"f1":"${f1}", "f2":$f2}""" + def bytes = json.getBytes(charset) + def row = Row(f1, f2) + } + val schema = new StructType().add("f1", StringType).add("f2", IntegerType) + withTempPath { path => + val records = List(Rec("a", 1), Rec("b", 2)) + val data = records.map(_.bytes).reduce((a1, a2) => a1 ++ delimInBytes ++ a2) + val os = new FileOutputStream(path) + os.write(data) + os.close() + val savedDf = spark + .read + .schema(schema) + .option("charset", charset) + .option("recordDelimiter", delimiter) + .json(path.getCanonicalPath) + checkAnswer(savedDf, records.map(_.row)) + } + } + } + + List( + ("sep", "UTF-8"), + ("x00 0a 00 0d", "UTF-16BE"), + ("\r\n", "UTF-16LE"), + ("\u000d\u000a", "UTF-32BE"), + ("===", "UTF-32LE"), + ("куку", "CP1251") + ).zipWithIndex.foreach{case ((d, c), i) => checkReadJson(c, d, i)} } From 7e5be5e2b4cf7f77914a0d91e74ea31ab8c272d0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 21:25:47 +0100 Subject: [PATCH 11/21] Fix inferring of csv schema for any charsets --- .../catalyst/json/CreateJacksonParser.scala | 10 ++++++ .../spark/sql/catalyst/json/JSONOptions.scala | 10 ++++++ .../datasources/HadoopFileLinesReader.scala | 6 ++-- .../datasources/json/JsonDataSource.scala | 19 +++++++--- .../datasources/text/TextFileFormat.scala | 12 +++++-- .../datasources/text/TextOptions.scala | 4 +++ .../datasources/json/JsonSuite.scala | 35 +++++++++++++------ 7 files changed, 74 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 025a388aacaa..f6295dac8aa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, InputStream, InputStreamReader} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.hadoop.io.Text +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.unsafe.types.UTF8String private[sql] object CreateJacksonParser extends Serializable { @@ -46,4 +47,13 @@ private[sql] object CreateJacksonParser extends Serializable { def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { jsonFactory.createParser(record) } + + def internalRow( + jsonFactory: JsonFactory, + row: InternalRow, + charset: Option[String] = None + ): JsonParser = { + val is = new ByteArrayInputStream(row.getBinary(0)) + inputStream(jsonFactory, is, charset) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 770225ec0acf..e6e56871b7c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -99,6 +99,10 @@ private[sql] class JSONOptions( * Hex pairs can be separated by any chars different from 0-9,A-F,a-f * - '\' - reserved for a sequence of control chars like "\r\n" * and unicode escape like "\u000D\u000A" + * + * Note: the option defines a delimiter for the json reader only, the json writer + * uses '\n' as the delimiter of output records (it is converted to sequence of + * bytes according to charset) */ val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect { case hexs if hexs.startsWith("x") => @@ -119,4 +123,10 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } + + def getTextOptions: Map[String, String] = { + recordDelimiter.map{ bytes => + "recordDelimiter" -> bytes.map("%02x".format(_)).mkString + }.toMap + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 9253388e5e8f..0e5a7462770b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl class HadoopFileLinesReader( file: PartitionedFile, conf: Configuration, - lineSeparator: Option[Array[Byte]] = None + recordDelimiter: Option[Array[Byte]] = None ) extends Iterator[Text] with Closeable { private val iterator = { val fileSplit = new FileSplit( @@ -45,8 +45,8 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = lineSeparator match { - case Some(sep) => new LineRecordReader(sep) + val reader = recordDelimiter match { + case Some(delim) => new LineRecordReader(delim) case _ => new LineRecordReader() } reader.initialize(fileSplit, hadoopAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 4a6041df7dc1..f9321cbc5c21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -33,6 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat @@ -92,25 +93,33 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { - val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths) + val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) - val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) - JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) + val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd + + JsonInferSchema.infer[InternalRow]( + rdd, + parsedOptions, + CreateJacksonParser.internalRow(_, _, parsedOptions.charset) + ) } private def createBaseDataset( sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): Dataset[String] = { + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions + ): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, - className = classOf[TextFileFormat].getName + className = classOf[TextFileFormat].getName, + options = parsedOptions.getTextOptions ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index c661e9bd3b94..c7af107e296d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -113,18 +113,24 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText) + readToUnsafeMem( + broadcastedHadoopConf, + requiredSchema, + textOptions.wholeText, + textOptions.recordDelimiter + ) } private def readToUnsafeMem( conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, - wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = { + wholeTextMode: Boolean, + recordDelimiter: Option[Array[Byte]]): (PartitionedFile) => Iterator[UnsafeRow] = { (file: PartitionedFile) => { val confValue = conf.value.value val reader = if (!wholeTextMode) { - new HadoopFileLinesReader(file, confValue) + new HadoopFileLinesReader(file, confValue, recordDelimiter) } else { new HadoopFileWholeTextReader(file, confValue) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 2a661561ab51..eea8e04ad171 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -39,9 +39,13 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val recordDelimiter: Option[Array[Byte]] = parameters.get(RECORDDELIMITER).map { hex => + hex.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val RECORDDELIMITER = "recordDelimiter" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9031ad10cb75..2d681d1be0aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2094,7 +2094,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("x0a 00 00 00", "UTF-32LE") ).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)} - def checkReadJson(charset: String, delimiter: String, runId: Int): Unit = { + def checkReadJson( + charset: String, + delimiter: String, + inferSchema: Boolean, + runId: Int + ): Unit = { test(s"checks reading json in ${charset} #${runId}") { val delimInBytes = { if (delimiter.startsWith("x")) { @@ -2116,9 +2121,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val os = new FileOutputStream(path) os.write(data) os.close() - val savedDf = spark - .read - .schema(schema) + val reader = if (inferSchema) { + spark.read + } else { + spark.read.schema(schema) + } + val savedDf = reader .option("charset", charset) .option("recordDelimiter", delimiter) .json(path.getCanonicalPath) @@ -2128,11 +2136,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } List( - ("sep", "UTF-8"), - ("x00 0a 00 0d", "UTF-16BE"), - ("\r\n", "UTF-16LE"), - ("\u000d\u000a", "UTF-32BE"), - ("===", "UTF-32LE"), - ("куку", "CP1251") - ).zipWithIndex.foreach{case ((d, c), i) => checkReadJson(c, d, i)} + ("sep", "UTF-8", false), + ("x00 0a 00 0d", "UTF-16BE", false), + ("x00 0a 00 0d", "UTF-16BE", true), + ("\r\n", "UTF-16LE", false), + ("\r\n", "UTF-16LE", true), + ("\u000d\u000a", "UTF-32BE", false), + ("\u000a\u000d", "UTF-32BE", true), + ("===", "UTF-32LE", false), + ("$^+", "UTF-32LE", true), + ("куку", "CP1251", false), + ("куку", "CP1251", true) + ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} } From d138d2d4e7b6e0c3e46d73939ff06a875128d59d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Mar 2018 22:02:44 +0100 Subject: [PATCH 12/21] Fix errors of scalastyle check --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2d681d1be0aa..4d708cf52248 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2145,7 +2145,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("\u000a\u000d", "UTF-32BE", true), ("===", "UTF-32LE", false), ("$^+", "UTF-32LE", true), - ("куку", "CP1251", false), - ("куку", "CP1251", true) + ("xEA.F3.EA.F3", "CP1251", false), + ("xEA.F3.EA.F3", "CP1251", true) ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} } From c26ef5d3d2a3970c80c973eec696805929bd7725 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 12:20:34 +0100 Subject: [PATCH 13/21] Reserving format for regular expressions and concatenated json --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e6e56871b7c9..b4deab202026 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -99,6 +99,8 @@ private[sql] class JSONOptions( * Hex pairs can be separated by any chars different from 0-9,A-F,a-f * - '\' - reserved for a sequence of control chars like "\r\n" * and unicode escape like "\u000D\u000A" + * - 'r' - specifies a regular expression + * - 'none' - json records are not divided by any delimiter * * Note: the option defines a delimiter for the json reader only, the json writer * uses '\n' as the delimiter of output records (it is converted to sequence of @@ -108,6 +110,8 @@ private[sql] class JSONOptions( case hexs if hexs.startsWith("x") => hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("none") => + throw new NotImplementedError(s"the $reserved selector has not supported yet") case delim => delim.getBytes(charset.getOrElse( throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) } From 5f0b0694f142bd69127c8991d83a24f528316b2b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 21:18:21 +0100 Subject: [PATCH 14/21] Fix recordDelimiter tests --- .../catalyst/json/CreateJacksonParser.scala | 4 +- .../spark/sql/catalyst/json/JSONOptions.scala | 6 +-- .../datasources/json/JsonSuite.scala | 49 +++---------------- 3 files changed, 12 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index f6295dac8aa8..a27b73a5e827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -53,7 +53,9 @@ private[sql] object CreateJacksonParser extends Serializable { row: InternalRow, charset: Option[String] = None ): JsonParser = { + require(charset == Some("UTF-8")) val is = new ByteArrayInputStream(row.getBinary(0)) - inputStream(jsonFactory, is, charset) + + inputStream(jsonFactory, is) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index b4deab202026..e0e78453d972 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -85,11 +85,7 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - /** - * Standard charset name. For example UTF-8, UTF-16 and UTF-32. - * If charset is not specified (None), it will be detected automatically. - */ - val charset: Option[String] = parameters.get("charset") + val charset: Option[String] = Some("UTF-8") /** * A sequence of bytes between two consecutive json records. Format of the option is: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4d708cf52248..e0322c0da818 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2063,36 +2063,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } - - def checkReadWrittenJson(charset: String, delimiter: String, runId: Int): Unit = { - test(s"checks Spark is able to read json written by Spark itself #${runId}") { - withTempPath { path => - val ds = spark.createDataset(Seq( - ("a", 1), ("b", 2), ("c", 3)) - ).repartition(1) - ds.write - .option("charset", charset) - .format("json").mode("overwrite") - .save(path.getCanonicalPath) - val savedDf = spark - .read - .schema(ds.schema) - .option("charset", charset) - .option("recordDelimiter", delimiter) - .json(path.getCanonicalPath) - - checkAnswer(savedDf.toDF(), ds.toDF()) - } - } - } - - List( - ("\n", "UTF-8"), - ("x00 0a", "UTF-16BE"), - ("\n", "UTF-16LE"), - ("\u000a", "UTF-32BE"), - ("x0a 00 00 00", "UTF-32LE") - ).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)} def checkReadJson( charset: String, @@ -2127,7 +2097,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.schema(schema) } val savedDf = reader - .option("charset", charset) .option("recordDelimiter", delimiter) .json(path.getCanonicalPath) checkAnswer(savedDf, records.map(_.row)) @@ -2137,15 +2106,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { List( ("sep", "UTF-8", false), - ("x00 0a 00 0d", "UTF-16BE", false), - ("x00 0a 00 0d", "UTF-16BE", true), - ("\r\n", "UTF-16LE", false), - ("\r\n", "UTF-16LE", true), - ("\u000d\u000a", "UTF-32BE", false), - ("\u000a\u000d", "UTF-32BE", true), - ("===", "UTF-32LE", false), - ("$^+", "UTF-32LE", true), - ("xEA.F3.EA.F3", "CP1251", false), - ("xEA.F3.EA.F3", "CP1251", true) + ("x00 0a 00 0d", "UTF-8", true), + ("xEA.F3.EA.F3", "UTF-8", false), + ("\r\n", "UTF-8", false), + ("\r\n", "UTF-8", true), + ("\u000d\u000a", "UTF-8", false), + ("\u000a\u000d", "UTF-8", true), + ("===", "UTF-8", false), + ("$^+", "UTF-8", true) ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} } From ef8248f862949becdb3d370ac94a1cfc1f7c3068 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 21:34:56 +0100 Subject: [PATCH 15/21] Additional cases are added to the delimiter test --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e0322c0da818..a2ffc5b60e9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2104,7 +2104,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + // scalastyle:off nonascii List( + ("|", "UTF-8", false), + ("^", "UTF-8", true), + ("::", "UTF-8", true), + ("!!!@3", "UTF-8", false), + (0x1E.toChar.toString, "UTF-8", true), + ("아", "UTF-8", false), + ("куку", "UTF-8", true), ("sep", "UTF-8", false), ("x00 0a 00 0d", "UTF-8", true), ("xEA.F3.EA.F3", "UTF-8", false), @@ -2115,4 +2123,5 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("===", "UTF-8", false), ("$^+", "UTF-8", true) ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} + // scalastyle:on nonascii } From 2efac082ea4e40b89b4d01274851c0dcdd49eb44 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 22:01:56 +0100 Subject: [PATCH 16/21] Renaming recordDelimiter to lineSeparator --- .../spark/sql/catalyst/json/JSONOptions.scala | 17 ++++++++++------- .../sql/catalyst/json/JacksonGenerator.scala | 7 ++++++- .../datasources/HadoopFileLinesReader.scala | 4 ++-- .../datasources/json/JsonDataSource.scala | 2 +- .../datasources/text/TextFileFormat.scala | 6 +++--- .../datasources/text/TextOptions.scala | 4 ++-- .../execution/datasources/json/JsonSuite.scala | 2 +- 7 files changed, 25 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e0e78453d972..daf92e1cf353 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -89,28 +89,31 @@ private[sql] class JSONOptions( /** * A sequence of bytes between two consecutive json records. Format of the option is: - * selector (1 char) + delimiter body (any length) + * selector (1 char) + delimiter body (any length) | sequence of chars * The following selectors are supported: * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". * Hex pairs can be separated by any chars different from 0-9,A-F,a-f * - '\' - reserved for a sequence of control chars like "\r\n" * and unicode escape like "\u000D\u000A" - * - 'r' - specifies a regular expression - * - 'none' - json records are not divided by any delimiter + * - 'r' and '/' - reserved for future use * * Note: the option defines a delimiter for the json reader only, the json writer * uses '\n' as the delimiter of output records (it is converted to sequence of * bytes according to charset) */ - val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect { + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { case hexs if hexs.startsWith("x") => hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray .map(Integer.parseInt(_, 16).toByte) - case reserved if reserved.startsWith("r") || reserved.startsWith("none") => + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => throw new NotImplementedError(s"the $reserved selector has not supported yet") case delim => delim.getBytes(charset.getOrElse( throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) } + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator + + // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + val lineSeparatorInWrite: String = parameters.get("lineSepInWrite").getOrElse("\n") /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { @@ -125,8 +128,8 @@ private[sql] class JSONOptions( } def getTextOptions: Map[String, String] = { - recordDelimiter.map{ bytes => - "recordDelimiter" -> bytes.map("%02x".format(_)).mkString + lineSeparatorInRead.map{ bytes => + "lineSep" -> bytes.map("%02x".format(_)).mkString }.toMap } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index eb06e4f304f0..a94a2fe7881d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -74,6 +74,8 @@ private[sql] class JacksonGenerator( private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) + private val lineSeparator: String = options.lineSeparatorInWrite + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -251,5 +253,8 @@ private[sql] class JacksonGenerator( mapType = dataType.asInstanceOf[MapType])) } - def writeLineEnding(): Unit = gen.writeRaw('\n') + def writeLineEnding(): Unit = { + // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + gen.writeRaw(lineSeparator) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 0e5a7462770b..e7ad6c32e024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl class HadoopFileLinesReader( file: PartitionedFile, conf: Configuration, - recordDelimiter: Option[Array[Byte]] = None + lineSeparator: Option[Array[Byte]] = None ) extends Iterator[Text] with Closeable { private val iterator = { val fileSplit = new FileSplit( @@ -45,7 +45,7 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = recordDelimiter match { + val reader = lineSeparator match { case Some(delim) => new LineRecordReader(delim) case _ => new LineRecordReader() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index f9321cbc5c21..b2a4503e50a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -129,7 +129,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordDelimiter) + val linesReader = new HadoopFileLinesReader(file, conf, parser.options.lineSeparatorInRead) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index c7af107e296d..e606da213d0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -117,7 +117,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { broadcastedHadoopConf, requiredSchema, textOptions.wholeText, - textOptions.recordDelimiter + textOptions.lineSeparator ) } @@ -125,12 +125,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, wholeTextMode: Boolean, - recordDelimiter: Option[Array[Byte]]): (PartitionedFile) => Iterator[UnsafeRow] = { + lineSeparator: Option[Array[Byte]]): (PartitionedFile) => Iterator[UnsafeRow] = { (file: PartitionedFile) => { val confValue = conf.value.value val reader = if (!wholeTextMode) { - new HadoopFileLinesReader(file, confValue, recordDelimiter) + new HadoopFileLinesReader(file, confValue, lineSeparator) } else { new HadoopFileWholeTextReader(file, confValue) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index eea8e04ad171..a3cebd7cb62c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -39,7 +39,7 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - val recordDelimiter: Option[Array[Byte]] = parameters.get(RECORDDELIMITER).map { hex => + val lineSeparator: Option[Array[Byte]] = parameters.get(LINESEP).map { hex => hex.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) } } @@ -47,5 +47,5 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" - val RECORDDELIMITER = "recordDelimiter" + val LINESEP = "lineSep" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a2ffc5b60e9a..72e6941fe7af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2097,7 +2097,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.schema(schema) } val savedDf = reader - .option("recordDelimiter", delimiter) + .option("lineSep", delimiter) .json(path.getCanonicalPath) checkAnswer(savedDf, records.map(_.row)) } From b2020fa99584d03e1754a4a1b5991dce4875f448 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 22:38:33 +0100 Subject: [PATCH 17/21] Adding HyukjinKwon changes --- python/pyspark/sql/readwriter.py | 16 ++++--- python/pyspark/sql/streaming.py | 4 +- python/pyspark/sql/tests.py | 17 +++++++ .../apache/spark/sql/DataFrameReader.scala | 3 ++ .../apache/spark/sql/DataFrameWriter.scala | 3 ++ .../sql/streaming/DataStreamReader.scala | 3 ++ .../datasources/json/JsonSuite.scala | 45 +++++++++++++++---- 7 files changed, 75 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index facc16bc5310..ea06e4b71e90 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, - multiLine=None, allowUnquotedControlChars=None): + multiLine=None, allowUnquotedControlChars=None, lineSep=None): """ Loads JSON files and returns the results as a :class:`DataFrame`. @@ -237,7 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. - + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] @@ -254,7 +255,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, - allowUnquotedControlChars=allowUnquotedControlChars) + allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -744,7 +745,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None, + lineSep=None): """Saves the content of the :class:`DataFrame` in JSON format (`JSON Lines text format or newline-delimited JSON `_) at the specified path. @@ -768,12 +770,14 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``. - + :param lineSep: defines the line separator that should be used for writing. If None is + set, it uses the default value, ``\\n``. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts( - compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat, + lineSep=lineSep) self._jwrite.json(path) @since(1.4) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e8966c20a8f4..a0b6180ef14b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -407,7 +407,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, - multiLine=None, allowUnquotedControlChars=None): + multiLine=None, allowUnquotedControlChars=None, lineSep=None): """ Loads a JSON file stream and returns the results as a :class:`DataFrame`. @@ -484,7 +484,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, - allowUnquotedControlChars=allowUnquotedControlChars) + allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 480815d27333..5045cc20d682 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -648,6 +648,23 @@ def test_non_existed_udaf(self): self.assertRaisesRegexp(AnalysisException, "Can not load class non_existed_udaf", lambda: spark.udf.registerJavaUDAF("udaf1", "non_existed_udaf")) + def test_linesep_json(self): + df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",") + expected = [Row(_corrupt_record=None, name=u'Michael'), + Row(_corrupt_record=u' "age":30}\n{"name":"Justin"', name=None), + Row(_corrupt_record=u' "age":19}\n', name=None)] + self.assertEqual(df.collect(), expected) + + tpath = tempfile.mkdtemp() + shutil.rmtree(tpath) + try: + df = self.spark.read.json("python/test_support/sql/people.json") + df.write.json(tpath, lineSep="!!") + readback = self.spark.read.json(tpath, lineSep="!!") + self.assertEqual(readback.collect(), df.collect()) + finally: + shutil.rmtree(tpath) + def test_multiLine_json(self): people1 = self.spark.read.json("python/test_support/sql/people.json") people_array = self.spark.read.json("python/test_support/sql/people_array.json", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0139913aaa4e..4a0dcc80107b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -367,6 +367,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `multiLine` (default `false`): parse one record, which may span multiple lines, * per file
  • * + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator + * that should be used for parsing.
  • + * * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ed7a9100cc7f..28ad15a470b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -519,6 +519,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type. * + *
  • `lineSep` (default `\n`): defines the line separator that should + * be used for writing.
  • + * * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index c393dcdfdd7e..b23fe3f03a96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -269,6 +269,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `multiLine` (default `false`): parse one record, which may span multiple lines, * per file
  • * + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator + * that should be used for parsing.
  • + * * * @since 2.0.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 72e6941fe7af..7aaa623a0bdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.json import java.io.{File, FileOutputStream, StringWriter} import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.sql.{Date, Timestamp} import java.util.Locale @@ -27,7 +28,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} @@ -2066,17 +2067,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { def checkReadJson( charset: String, - delimiter: String, + lineSep: String, inferSchema: Boolean, runId: Int ): Unit = { test(s"checks reading json in ${charset} #${runId}") { val delimInBytes = { - if (delimiter.startsWith("x")) { - delimiter.replaceAll("[^0-9A-Fa-f]", "") + if (lineSep.startsWith("x")) { + lineSep.replaceAll("[^0-9A-Fa-f]", "") .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) } else { - delimiter.getBytes(charset) + lineSep.getBytes(charset) } } case class Rec(f1: String, f2: Int) { @@ -2097,7 +2098,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.schema(schema) } val savedDf = reader - .option("lineSep", delimiter) + .option("lineSep", lineSep) .json(path.getCanonicalPath) checkAnswer(savedDf, records.map(_.row)) } @@ -2114,8 +2115,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("아", "UTF-8", false), ("куку", "UTF-8", true), ("sep", "UTF-8", false), - ("x00 0a 00 0d", "UTF-8", true), - ("xEA.F3.EA.F3", "UTF-8", false), + ("x0a 0d", "UTF-8", true), + ("x54.45", "UTF-8", false), ("\r\n", "UTF-8", false), ("\r\n", "UTF-8", true), ("\u000d\u000a", "UTF-8", false), @@ -2124,4 +2125,32 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("$^+", "UTF-8", true) ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} // scalastyle:on nonascii + + def testLineSeparator(lineSep: String): Unit = { + test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { + // Write + withTempPath { path => + Seq("a", "b", "c").toDF("value").coalesce(1) + .write.option("lineSepInWrite", lineSep).json(path.getAbsolutePath) + val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head + val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) + assert( + readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""") + } + + // Roundtrip + withTempPath { path => + val df = Seq("a", "b", "c").toDF() + df.write.option("lineSepInWrite", lineSep).json(path.getAbsolutePath) + val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) + checkAnswer(df, readBack) + } + } + } + + // scalastyle:off nonascii + Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep => + testLineSeparator(lineSep) + } + // scalastyle:on nonascii } From f99c1e16f2ad90c2a94e8c4b206b5b740506e136 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Mar 2018 23:23:21 +0100 Subject: [PATCH 18/21] Revert lineSepInWrite back to lineSep --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 +++- .../spark/sql/execution/datasources/json/JsonSuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index daf92e1cf353..a447911067da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -113,7 +113,9 @@ private[sql] class JSONOptions( val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. - val lineSeparatorInWrite: String = parameters.get("lineSepInWrite").getOrElse("\n") + val lineSeparatorInWrite: String = { + lineSeparator.map(new String(_, charset.getOrElse("UTF-8"))).getOrElse("\n") + } /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7aaa623a0bdf..8ad31231d51c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2131,7 +2131,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Write withTempPath { path => Seq("a", "b", "c").toDF("value").coalesce(1) - .write.option("lineSepInWrite", lineSep).json(path.getAbsolutePath) + .write.option("lineSep", lineSep).json(path.getAbsolutePath) val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) assert( @@ -2141,7 +2141,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Roundtrip withTempPath { path => val df = Seq("a", "b", "c").toDF() - df.write.option("lineSepInWrite", lineSep).json(path.getAbsolutePath) + df.write.option("lineSep", lineSep).json(path.getAbsolutePath) val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) checkAnswer(df, readBack) } From 77112ef5b12d4738914c78b46c25d058e6201b61 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Mar 2018 00:07:22 +0100 Subject: [PATCH 19/21] Fix passing of the lineSeparator to HadoopFileLinesReader --- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index b2a4503e50a3..c09a1617690b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -129,7 +129,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf, parser.options.lineSeparatorInRead) + val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val safeParser = new FailureSafeParser[Text]( input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), From d632706bf14c7a7c2688237e6dc552ca5aa9c98a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Mar 2018 09:39:35 +0100 Subject: [PATCH 20/21] Fix python style checking --- python/pyspark/sql/readwriter.py | 2 ++ python/pyspark/sql/streaming.py | 2 ++ python/pyspark/sql/tests.py | 12 ++++++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index d3e2c7471293..ce428b228338 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -239,6 +239,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, including tab and line feed characters) or not. :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. + >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] @@ -774,6 +775,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``. :param lineSep: defines the line separator that should be used for writing. If None is set, it uses the default value, ``\\n``. + >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 886049964d49..490df4accf87 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -470,6 +470,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 09b6e170f3af..505fc056369f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -670,6 +670,12 @@ def test_linesep_text(self): finally: shutil.rmtree(tpath) + def test_multiline_json(self): + people1 = self.spark.read.json("python/test_support/sql/people.json") + people_array = self.spark.read.json("python/test_support/sql/people_array.json", + multiLine=True) + self.assertEqual(people1.collect(), people_array.collect()) + def test_linesep_json(self): df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",") expected = [Row(_corrupt_record=None, name=u'Michael'), @@ -687,12 +693,6 @@ def test_linesep_json(self): finally: shutil.rmtree(tpath) - def test_multiline_json(self): - people1 = self.spark.read.json("python/test_support/sql/people.json") - people_array = self.spark.read.json("python/test_support/sql/people_array.json", - multiLine=True) - self.assertEqual(people1.collect(), people_array.collect()) - def test_multiline_csv(self): ages_newlines = self.spark.read.csv( "python/test_support/sql/ages_newlines.csv", multiLine=True) From bbff40206e6871ea9ab035e7a8876f495bdf3d90 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Mar 2018 11:58:55 +0100 Subject: [PATCH 21/21] Fix text source tests and javadoc comments --- .../apache/spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameReader.scala | 1 - .../scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - .../sql/execution/datasources/text/TextOptions.scala | 12 ++++++++++-- .../spark/sql/streaming/DataStreamReader.scala | 1 - 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index a447911067da..c908c7254b0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -131,7 +131,7 @@ private[sql] class JSONOptions( def getTextOptions: Map[String, String] = { lineSeparatorInRead.map{ bytes => - "lineSep" -> bytes.map("%02x".format(_)).mkString + "lineSep" -> bytes.map("x%02x".format(_)).mkString }.toMap } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index af60db0aefbf..ae3ba1690f69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -366,7 +366,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. *
  • `multiLine` (default `false`): parse one record, which may span multiple lines, * per file
  • - * *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator * that should be used for parsing.
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d15cc90f8e3f..bbc063148a72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -518,7 +518,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • - * *
  • `lineSep` (default `\n`): defines the line separator that should * be used for writing.
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index a1b5d22b65d9..386512c612ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -41,8 +41,16 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - val lineSeparator: Option[Array[Byte]] = parameters.get(LINE_SEPARATOR).map { hex => - hex.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + val charset: Option[String] = Some("UTF-8") + + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"the $reserved selector has not supported yet") + case delim => delim.getBytes(charset.getOrElse( + throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) } // Note that the option 'lineSep' uses a different default value in read and write. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index ffec904f8a41..ae93965bc50e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -268,7 +268,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * `java.text.SimpleDateFormat`. This applies to timestamp type. *
  • `multiLine` (default `false`): parse one record, which may span multiple lines, * per file
  • - * *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator * that should be used for parsing.
  • *