From 4e385f3d5a86671fc77a72f7f4738f71e4ce0b47 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Fri, 19 Aug 2016 13:34:47 -0700 Subject: [PATCH 01/11] minor change --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 60f042f1e07c5..cee8dcd58f42b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1220,7 +1220,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Methods for creating shared variables /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * Create an [[org.apache.spark.Accumulator]] variable of a given type, to which tasks can "add" * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ @deprecated("use AccumulatorV2", "2.0.0") diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 6cbd5ae5d428a..083accd84ace1 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -275,7 +275,7 @@ private[spark] object AccumulatorSuite { sc.listenerBus.waitUntilEmpty(10 * 1000) val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values) val isSet = accums.exists { a => - a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) + a.name.contains(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) } if (!isSet) { throw new TestFailedException(s"peak execution memory accumulator not set in '$testName'", 0) From 71ae4e5ddc5681a1ef3953f1a68f6b6cf6f38b24 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Fri, 19 Aug 2016 14:46:42 -0700 Subject: [PATCH 02/11] more specific exception, in consistant with other tests --- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 083accd84ace1..ffc84b5a29008 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -100,7 +100,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) - an [Exception] should be thrownBy {d.foreach{x => acc.value = x}} + an [SparkException] should be thrownBy {d.foreach{x => acc.value = x}} } test ("add value to collection accumulators") { From c9a119236399657d5dd59b85b2fae8c02528d788 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Mon, 22 Aug 2016 12:49:14 -0700 Subject: [PATCH 03/11] tmp save --- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index ffc84b5a29008..9c75aeadaa7f7 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -171,7 +171,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex d.foreach { x => acc.localValue ++= x } - acc.value should be ( (0 to maxI).toSet) + acc.value should be ((0 to maxI).toSet) resetSparkContext() } } From 067d3b772b59400ab228c5df4fe88e3dab14aa32 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Thu, 25 Aug 2016 15:08:32 -0700 Subject: [PATCH 04/11] remove unused import --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c060091c7fc38..93bf74d06b71d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.LogicalRDD -import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions} import org.apache.spark.sql.types.StructType From 221a5bb4995cb61c4edff91fd226d6e2310e006b Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Thu, 25 Aug 2016 15:36:37 -0700 Subject: [PATCH 05/11] remove unused import --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6da99ce0dd683..e7dcf0f51f4a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} From 039c3b08b28d9c178963318f1eb133aae1087643 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Thu, 25 Aug 2016 17:49:09 -0700 Subject: [PATCH 06/11] remove redundant conversion --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index fbf22197a1a11..2edf2e1972053 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1093,7 +1093,7 @@ object SQLContext { } data.map{ element => new GenericInternalRow( - methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) }.toArray[Any] + methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) } ): InternalRow } } From ce19b11e4dc8fa199edffa8821b895125810f4f2 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Thu, 25 Aug 2016 17:53:10 -0700 Subject: [PATCH 07/11] val plan is not used --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index cd485770d269c..ce84696e48a5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1452,7 +1452,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") { val df = spark.range(100).toDF() val join = df.join(df, "id") - val plan = join.queryExecution.executedPlan checkAnswer(join, df) assert( join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1) From 43c444b61e216187aab493c7d0eef21519f93ae2 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Fri, 26 Aug 2016 13:27:07 -0700 Subject: [PATCH 08/11] more sytle improvement --- .../test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- .../scala/org/apache/spark/ml/feature/Interaction.scala | 2 +- .../org/apache/spark/ml/r/IsotonicRegressionWrapper.scala | 2 +- .../main/scala/org/apache/spark/ml/util/stopwatches.scala | 2 +- .../org/apache/spark/mllib/feature/ChiSqSelector.scala | 2 +- .../scala/org/apache/spark/mllib/random/RandomRDDs.scala | 8 ++++---- .../spark/sql/execution/datasources/DataSource.scala | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 9c75aeadaa7f7..7d6483d626e67 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -275,7 +275,7 @@ private[spark] object AccumulatorSuite { sc.listenerBus.waitUntilEmpty(10 * 1000) val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values) val isSet = accums.exists { a => - a.name.contains(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) + a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L) } if (!isSet) { throw new TestFailedException(s"peak execution memory accumulator not set in '$testName'", 0) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala index 96d0bdee9e2b9..902f84f862c17 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala @@ -136,7 +136,7 @@ class Interaction @Since("1.6.0") (@Since("1.6.0") override val uid: String) ext case _: VectorUDT => val attrs = AttributeGroup.fromStructField(f).attributes.getOrElse( throw new SparkException("Vector attributes must be defined for interaction.")) - attrs.map(getNumFeatures).toArray + attrs.map(getNumFeatures) } new FeatureEncoder(numFeatures) }.toArray diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala index 1ea80cb46ab7b..a7992debe684a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala @@ -23,7 +23,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.ml.attribute.{AttributeGroup} +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.regression.{IsotonicRegression, IsotonicRegressionModel} import org.apache.spark.ml.util._ diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala index e79b1f31643d0..e539deca4b036 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.util import scala.collection.mutable import org.apache.spark.SparkContext -import org.apache.spark.util.LongAccumulator; +import org.apache.spark.util.LongAccumulator /** * Abstract class for stopwatches. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 56fb2d33c2ca0..33a1f18bccca5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -164,7 +164,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { case Row(feature: Int) => (feature) }.collect() - return new ChiSqSelectorModel(features) + new ChiSqSelectorModel(features) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala index c2bc1f17ccd58..6d60136ddc38f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala @@ -438,10 +438,10 @@ object RandomRDDs { @DeveloperApi @Since("1.6.0") def randomJavaRDD[T]( - jsc: JavaSparkContext, - generator: RandomDataGenerator[T], - size: Long): JavaRDD[T] = { - randomJavaRDD(jsc, generator, size, 0); + jsc: JavaSparkContext, + generator: RandomDataGenerator[T], + size: Long): JavaRDD[T] = { + randomJavaRDD(jsc, generator, size, 0) } // TODO Generate RDD[Vector] from multivariate distributions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b783d699745b1..348530888de3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -61,7 +61,7 @@ import org.apache.spark.util.Utils * qualified. This option only works when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present * we skip attempting to infer the schema. - * @param partitionColumns A list of column names that the relation is partitioned by. When this + * @param partitionColumns A list of column names that the relation is partitioned by. When this * list is empty, the relation is unpartitioned. * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data. */ From 70a751c6959048e65c083ab775b01523da4578a2 Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Fri, 26 Aug 2016 14:32:58 -0700 Subject: [PATCH 09/11] revert the wrong fix --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 970702fb61d59..08d6343d623cf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1238,7 +1238,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Methods for creating shared variables /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, to which tasks can "add" + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ @deprecated("use AccumulatorV2", "2.0.0") From a5e1db3a6978699300dd4b04901fa2f504006a5a Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Fri, 26 Aug 2016 16:35:54 -0700 Subject: [PATCH 10/11] minor fix after review --- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 7d6483d626e67..262353e4dcb69 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -100,7 +100,9 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) - an [SparkException] should be thrownBy {d.foreach{x => acc.value = x}} + intercept[SparkException] { + d.foreach(x => acc.value = x) + } } test ("add value to collection accumulators") { @@ -171,7 +173,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex d.foreach { x => acc.localValue ++= x } - acc.value should be ((0 to maxI).toSet) + assert(acc.value == (0 to maxI).toSet) resetSparkContext() } } From 5ac404caffd7b9a0d5167f40da62782e5c2fa9fe Mon Sep 17 00:00:00 2001 From: Xin Ren Date: Mon, 29 Aug 2016 10:23:00 -0700 Subject: [PATCH 11/11] undo changes --- core/src/test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 262353e4dcb69..6d03ee091e4ed 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -173,7 +173,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex d.foreach { x => acc.localValue ++= x } - assert(acc.value == (0 to maxI).toSet) + acc.value should be ((0 to maxI).toSet) resetSparkContext() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ce84696e48a5b..cd485770d269c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1452,6 +1452,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") { val df = spark.range(100).toDF() val join = df.join(df, "id") + val plan = join.queryExecution.executedPlan checkAnswer(join, df) assert( join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)