From d557892080c8d6ec33dd7a13f4b8cdad88b440b0 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Mon, 25 Sep 2017 17:31:36 +0800 Subject: [PATCH 01/10] add csv from `RDD[String]` API and related test case --- .../org/apache/spark/sql/DataFrameReader.scala | 4 ++++ .../sql/execution/datasources/csv/CSVSuite.scala | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 78b668c04fd5..3171fad837e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -455,6 +455,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { csv(Seq(path): _*) } + def csv(csvRDD: RDD[String]): DataFrame = { + csv(sparkSession.createDataset(csvRDD)(Encoders.STRING)) + } + /** * Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e439699605ab..f66e57d253cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -146,6 +146,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false) } + test("simple csv test with string RDD") { + val csvRDD = spark.sparkContext.textFile(carsFile) + val cars = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(csvRDD) + + verifyCars(cars, withHeader = true, checkTypes = true) + + val carsWithoutHeader = spark.read + .option("header", "false") + .csv(csvRDD) + + verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false) + } + test("test inferring booleans") { val result = spark.read .format("csv") From baaa93f5e837cdba02922e183a3f81c287e19854 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Mon, 25 Sep 2017 17:50:34 +0800 Subject: [PATCH 02/10] fix test case --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f66e57d253cf..838ed6b88e46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -147,7 +147,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("simple csv test with string RDD") { - val csvRDD = spark.sparkContext.textFile(carsFile) + val csvRDD = spark.sparkContext.textFile(testFile(carsFile)) val cars = spark.read .option("header", "true") .option("inferSchema", "true") From d4ef30abdda142a969400c9e6e11a089a5483385 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Mon, 25 Sep 2017 19:59:08 +0800 Subject: [PATCH 03/10] finish pyspark dataframe from rdd of csv string --- python/pyspark/sql/readwriter.py | 22 ++++++++++++++++++- .../apache/spark/sql/DataFrameReader.scala | 2 ++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index cb847a042031..7332d3014eed 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -336,6 +336,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``inferSchema`` option or specify the schema explicitly using ``schema``. :param path: string, or list of strings, for input path(s). + or RDD of Strings storing CSV objects. :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). :param sep: sets the single character as a separator for each field and value. @@ -408,6 +409,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes [('_c0', 'string'), ('_c1', 'string')] + >>> rdd = sc.textFile('python/test_support/sql/ages.csv') + >>> df2 = spark.read.csv(rdd) + >>> df2.dtypes + [('_c0', 'string'), ('_c1', 'string')] """ self._set_opts( schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, @@ -420,7 +425,22 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) if isinstance(path, basestring): path = [path] - return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) + if type(path) == list: + return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) + elif isinstance(path, RDD): + def func(iterator): + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + if isinstance(x, unicode): + x = x.encode("utf-8") + yield x + keyed = path.mapPartitions(func) + keyed._bypass_serializer = True + jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) + return self._df(self._jreader.csv(jrdd)) + else: + raise TypeError("path can be only string, list or RDD") @since(1.5) def orc(self, path): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3171fad837e7..0e45367d87a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -455,6 +455,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { csv(Seq(path): _*) } + def csv(csvRDD: JavaRDD[String]): DataFrame = csv(csvRDD.rdd) + def csv(csvRDD: RDD[String]): DataFrame = { csv(sparkSession.createDataset(csvRDD)(Encoders.STRING)) } From 9bd4eed474fdfa20d5933558d519fb187694aa33 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Mon, 25 Sep 2017 20:13:50 +0800 Subject: [PATCH 04/10] modified comments --- python/pyspark/sql/readwriter.py | 2 +- .../apache/spark/sql/DataFrameReader.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7332d3014eed..1ed452d895b4 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -336,7 +336,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``inferSchema`` option or specify the schema explicitly using ``schema``. :param path: string, or list of strings, for input path(s). - or RDD of Strings storing CSV objects. + or RDD of Strings storing CSV rows. :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). :param sep: sets the single character as a separator for each field and value. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0e45367d87a0..5d054370d8c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -455,8 +455,34 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { csv(Seq(path): _*) } + /** + * Loads a `JavaRDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * @since 2.2.0 + */ + @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") def csv(csvRDD: JavaRDD[String]): DataFrame = csv(csvRDD.rdd) + /** + * Loads a `RDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * @since 2.2.0 + */ + @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") def csv(csvRDD: RDD[String]): DataFrame = { csv(sparkSession.createDataset(csvRDD)(Encoders.STRING)) } From 7525b48d2b9b59b1d6ce74a145fc049cfce6529a Mon Sep 17 00:00:00 2001 From: goldmedal Date: Mon, 25 Sep 2017 20:14:55 +0800 Subject: [PATCH 05/10] modified comments --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 5d054370d8c1..0fb41e6faa2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -465,6 +465,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * it determines the columns as string types and it reads only the first line to determine the * names and the number of fields. * + * @param csvRDD input RDD with one CSV row per record * @since 2.2.0 */ @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") @@ -480,6 +481,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * it determines the columns as string types and it reads only the first line to determine the * names and the number of fields. * + * @param csvRDD input RDD with one CSV row per record * @since 2.2.0 */ @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") From 350a93d4ce0f7441c31909ee0aa5078eeb48c4a7 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Tue, 26 Sep 2017 21:23:30 +0800 Subject: [PATCH 06/10] remove scala-side api --- .../apache/spark/sql/DataFrameReader.scala | 34 ------------------- .../execution/datasources/csv/CSVSuite.scala | 16 --------- 2 files changed, 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0fb41e6faa2a..78b668c04fd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -455,40 +455,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { csv(Seq(path): _*) } - /** - * Loads a `JavaRDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. - * - * If the schema is not specified using `schema` function and `inferSchema` option is enabled, - * this function goes through the input once to determine the input schema. - * - * If the schema is not specified using `schema` function and `inferSchema` option is disabled, - * it determines the columns as string types and it reads only the first line to determine the - * names and the number of fields. - * - * @param csvRDD input RDD with one CSV row per record - * @since 2.2.0 - */ - @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") - def csv(csvRDD: JavaRDD[String]): DataFrame = csv(csvRDD.rdd) - - /** - * Loads a `RDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. - * - * If the schema is not specified using `schema` function and `inferSchema` option is enabled, - * this function goes through the input once to determine the input schema. - * - * If the schema is not specified using `schema` function and `inferSchema` option is disabled, - * it determines the columns as string types and it reads only the first line to determine the - * names and the number of fields. - * - * @param csvRDD input RDD with one CSV row per record - * @since 2.2.0 - */ - @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") - def csv(csvRDD: RDD[String]): DataFrame = { - csv(sparkSession.createDataset(csvRDD)(Encoders.STRING)) - } - /** * Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 838ed6b88e46..e439699605ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -146,22 +146,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false) } - test("simple csv test with string RDD") { - val csvRDD = spark.sparkContext.textFile(testFile(carsFile)) - val cars = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(csvRDD) - - verifyCars(cars, withHeader = true, checkTypes = true) - - val carsWithoutHeader = spark.read - .option("header", "false") - .csv(csvRDD) - - verifyCars(carsWithoutHeader, withHeader = false, checkTypes = false) - } - test("test inferring booleans") { val result = spark.read .format("csv") From 4040103886410620c557f72372f9a7ba87485afe Mon Sep 17 00:00:00 2001 From: goldmedal Date: Tue, 26 Sep 2017 21:25:20 +0800 Subject: [PATCH 07/10] use java dataset to wrap rdd api --- python/pyspark/sql/readwriter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1ed452d895b4..074432e089bf 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -438,7 +438,10 @@ def func(iterator): keyed = path.mapPartitions(func) keyed._bypass_serializer = True jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) - return self._df(self._jreader.csv(jrdd)) + jdataset = self._spark._ssql_ctx.createDataset( + jrdd.rdd(), + self._spark._sc._jvm.Encoders.STRING()) + return self._df(self._jreader.csv(jdataset)) else: raise TypeError("path can be only string, list or RDD") From f5429677ac66ded20b63dc9641ff69193c08897f Mon Sep 17 00:00:00 2001 From: goldmedal Date: Tue, 26 Sep 2017 22:39:07 +0800 Subject: [PATCH 08/10] simplify the code and add comments --- python/pyspark/sql/readwriter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 074432e089bf..f8fc800a2026 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -438,9 +438,13 @@ def func(iterator): keyed = path.mapPartitions(func) keyed._bypass_serializer = True jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) + # [SPARK-22112] + # There aren't any jvm api for creating a dataframe from rdd storing csv. + # We can do it through creating a jvm dataset firstly and using the jvm api + # for creating a dataframe from dataset storing csv. jdataset = self._spark._ssql_ctx.createDataset( jrdd.rdd(), - self._spark._sc._jvm.Encoders.STRING()) + self._spark._jvm.Encoders.STRING()) return self._df(self._jreader.csv(jdataset)) else: raise TypeError("path can be only string, list or RDD") From 5988336af152c91ffa18bc40ecb2846d0e6287a3 Mon Sep 17 00:00:00 2001 From: goldmedal Date: Tue, 26 Sep 2017 22:51:11 +0800 Subject: [PATCH 09/10] fix some comment --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f8fc800a2026..191609b1bbed 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -438,7 +438,7 @@ def func(iterator): keyed = path.mapPartitions(func) keyed._bypass_serializer = True jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) - # [SPARK-22112] + # see SPARK-22112 # There aren't any jvm api for creating a dataframe from rdd storing csv. # We can do it through creating a jvm dataset firstly and using the jvm api # for creating a dataframe from dataset storing csv. From 032b0c813418f1533e8e61b3c6f84cf2b48ef6be Mon Sep 17 00:00:00 2001 From: goldmedal Date: Tue, 26 Sep 2017 22:55:35 +0800 Subject: [PATCH 10/10] fix some comment --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 191609b1bbed..f3092918abb5 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -335,7 +335,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``inferSchema`` is enabled. To avoid going through the entire data once, disable ``inferSchema`` option or specify the schema explicitly using ``schema``. - :param path: string, or list of strings, for input path(s). + :param path: string, or list of strings, for input path(s), or RDD of Strings storing CSV rows. :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).