From 673c29b2166e002d97b914ef8f8316df71fc8be7 Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Sat, 10 Sep 2016 10:02:21 +0800 Subject: [PATCH 01/11] solve spark-17447 --- core/src/main/scala/org/apache/spark/Partitioner.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 98c3abe93b553..54596d0650a55 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -55,14 +55,16 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse - for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { - return r.partitioner.get + val rdds = Seq(rdd) ++ others + + val filteredRdds = rdds.filter( _.partitioner.exists(_.numPartitions > 0 )) + if(filteredRdds.nonEmpty) { + return filteredRdds.maxBy( _.partitions.length).partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { - new HashPartitioner(bySize.head.partitions.length) + new HashPartitioner(rdds.map(_.partitions.length).max) } } } From a4609059350af3ebeb68e5acdfc99daf424a817a Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Sat, 10 Sep 2016 10:26:46 +0800 Subject: [PATCH 02/11] Update Partitioner.scala --- core/src/main/scala/org/apache/spark/Partitioner.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 54596d0650a55..9fb1c4c055519 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -56,7 +56,6 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = Seq(rdd) ++ others - val filteredRdds = rdds.filter( _.partitioner.exists(_.numPartitions > 0 )) if(filteredRdds.nonEmpty) { return filteredRdds.maxBy( _.partitions.length).partitioner.get From 7829bd0a3c66c474ec67f64d1ef043d0e251cdf6 Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Sat, 10 Sep 2016 20:26:54 +0800 Subject: [PATCH 03/11] solve spark-17447 --- .../scala/org/apache/spark/Partitioner.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 54596d0650a55..31b7d89e35a41 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -55,16 +55,16 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val rdds = Seq(rdd) ++ others - - val filteredRdds = rdds.filter( _.partitioner.exists(_.numPartitions > 0 )) - if(filteredRdds.nonEmpty) { - return filteredRdds.maxBy( _.partitions.length).partitioner.get - } - if (rdd.context.conf.contains("spark.default.parallelism")) { - new HashPartitioner(rdd.context.defaultParallelism) + val rdds = (Seq(rdd) ++ others) + val hashPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) + if (hashPartitioner.nonEmpty) { + hashPartitioner.maxBy(_.partitions.length).partitioner.get } else { - new HashPartitioner(rdds.map(_.partitions.length).max) + if (rdd.context.conf.contains("spark.default.parallelism")) { + new HashPartitioner(rdd.context.defaultParallelism) + } else { + new HashPartitioner(rdds.map(_.partitions.length).max) + } } } } From 8ddc442fc40f71d85fcaef8e4a721f6b31a5ea5c Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Sat, 10 Sep 2016 20:33:19 +0800 Subject: [PATCH 04/11] fix code style --- .../scala/org/apache/spark/Partitioner.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9fb1c4c055519..31b7d89e35a41 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -55,15 +55,16 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val rdds = Seq(rdd) ++ others - val filteredRdds = rdds.filter( _.partitioner.exists(_.numPartitions > 0 )) - if(filteredRdds.nonEmpty) { - return filteredRdds.maxBy( _.partitions.length).partitioner.get - } - if (rdd.context.conf.contains("spark.default.parallelism")) { - new HashPartitioner(rdd.context.defaultParallelism) + val rdds = (Seq(rdd) ++ others) + val hashPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) + if (hashPartitioner.nonEmpty) { + hashPartitioner.maxBy(_.partitions.length).partitioner.get } else { - new HashPartitioner(rdds.map(_.partitions.length).max) + if (rdd.context.conf.contains("spark.default.parallelism")) { + new HashPartitioner(rdd.context.defaultParallelism) + } else { + new HashPartitioner(rdds.map(_.partitions.length).max) + } } } } From f5d1e24d38f4a24f2ebc29214eb1a331846a0b1b Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Sat, 10 Sep 2016 23:21:44 +0800 Subject: [PATCH 05/11] Update Partitioner.scala --- core/src/main/scala/org/apache/spark/Partitioner.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 31b7d89e35a41..93dfbc0e6ed65 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -56,9 +56,9 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) - val hashPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) - if (hashPartitioner.nonEmpty) { - hashPartitioner.maxBy(_.partitions.length).partitioner.get + val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) + if (hasPartitioner.nonEmpty) { + hasPartitioner.maxBy(_.partitions.length).partitioner.get } else { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) From e426ccfabeb4e9baa38bceac893db7d985cfa860 Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Tue, 13 Sep 2016 18:51:57 +0800 Subject: [PATCH 06/11] solve SPARK-17521 --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e32e4aa5b8312..9994169a1c113 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -795,7 +795,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap - new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) + new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, defaultParallelism), indexToPrefs) } /** From 8bfcd6b66950b40953d984fee93e8b16cbf7af05 Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Tue, 13 Sep 2016 21:00:12 +0800 Subject: [PATCH 07/11] fix --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9994169a1c113..35b6334832393 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -795,7 +795,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap - new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, defaultParallelism), indexToPrefs) + new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs) } /** From 379cd5a687d83c363179071a998bd689feb80e71 Mon Sep 17 00:00:00 2001 From: codlife Date: Tue, 13 Sep 2016 21:03:39 +0800 Subject: [PATCH 08/11] Update SparkContext.scala --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9994169a1c113..35b6334832393 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -795,7 +795,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap - new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, defaultParallelism), indexToPrefs) + new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs) } /** From 1d0d4fcf9d9c06a0d03fc8dd8ba7582e4945231a Mon Sep 17 00:00:00 2001 From: codlife <1004910847@qq.com> Date: Mon, 17 Oct 2016 15:46:39 +0800 Subject: [PATCH 09/11] support stand json file --- .../apache/spark/sql/DataFrameReader.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 93bf74d06b71d..c8953acf5e372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -244,11 +244,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @since 1.4.0 */ - def json(path: String): DataFrame = { + + def json(path: String, isStandard: Boolean = false ): DataFrame = { // This method ensures that calls that explicit need single argument works, see SPARK-16009 - json(Seq(path): _*) + if (!isStandard) { + json(Seq(path): _*) + } else { + val jsonRDD = sparkSession.sparkContext.wholeTextFiles(path) + .map(line => line.toString().replaceAll("\\s+", "")) + .map { jsonLine => + val index = jsonLine.indexOf(",") + jsonLine.substring(index + 1, jsonLine.length - 1) + } + sparkSession.read.json(jsonRDD) + } } + /** + * To keep compatible with spark-1.6 we provide this method see SPARK-16009 + */ + def json(path: String): DataFrame = format("json").load(path) + /** * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. * From 2084079e419e10abe58999adacaa020dfdcef964 Mon Sep 17 00:00:00 2001 From: codlife Date: Mon, 17 Oct 2016 16:35:10 +0800 Subject: [PATCH 10/11] Update DataFrameReader.scala --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ddb0c86e1c54b..0add861a2165d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -261,7 +261,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * To keep compatible with spark-1.6 we provide this method see SPARK-16009 + * To keep compatible with spark-1.6 see SPARK-16009. + * Because the json(path: String, isStandard: Boolean = false) method will compile failed + * when we call Option(path).map(spark.read.json), we provide this method. */ def json(path: String): DataFrame = format("json").load(path) From 43bf4e5a1ca7ec53e499c095246ea77f0cb094aa Mon Sep 17 00:00:00 2001 From: codlife Date: Mon, 17 Oct 2016 16:51:15 +0800 Subject: [PATCH 11/11] Update DataFrameReader.scala --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0add861a2165d..b7abd8c2575c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -240,6 +240,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. + * With param isStandard we can load multi-lines json object directly. * See the documentation on the overloaded `json()` method with varargs for more details. * * @since 1.4.0