diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 9c8b2d037558..62c185a91d85 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -90,7 +90,7 @@ private[sql] case class AvroDataToCatalyst( @transient private lazy val nullResultRow: Any = dataType match { case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) - for(i <- 0 until st.length) { + for (i <- 0 until st.length) { resultRow.setNullAt(i) } resultRow diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index c1a05bf54707..8497b4a13e57 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -389,7 +389,7 @@ private[spark] object SerDe { writeType(dos, "map") writeInt(dos, v.size) val iter = v.entrySet.iterator - while(iter.hasNext) { + while (iter.hasNext) { val entry = iter.next val key = entry.getKey val value = entry.getValue diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 86e738d98693..ce9f70c9a83e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -317,7 +317,7 @@ object HistoryServer extends Logging { ShutdownHookManager.addShutdownHook { () => server.stop() } // Wait until the end of the world... or if the HistoryServer process is manually stopped - while(true) { Thread.sleep(Int.MaxValue) } + while (true) { Thread.sleep(Int.MaxValue) } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index de517acbf8c5..b9d88266ed53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -151,7 +151,7 @@ private[spark] class HadoopDelegationTokenManager( creds.addAll(newTokens) } }) - if(!currentUser.equals(freshUGI)) { + if (!currentUser.equals(freshUGI)) { FileSystem.closeAllForUGI(freshUGI) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index a806b72766c6..126c92e4cb65 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -240,7 +240,7 @@ private object PipedRDD { def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] val tok = new StringTokenizer(command) - while(tok.hasMoreElements) { + while (tok.hasMoreElements) { buf += tok.nextToken() } buf.toSeq diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a3d074ddd56c..fdc82285b76b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -806,7 +806,7 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) // SPARK-37300: when the task was already finished state, just ignore it, // so that there won't cause successful and tasksSuccessful wrong result. - if(info.finished) { + if (info.finished) { if (dropTaskInfoAccumulablesOnTaskCompletion) { // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable // lifetime. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 73e72b7f1dfc..80e6ab7c0a66 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -238,7 +238,7 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false f: BlockInfo => Boolean): Option[BlockInfo] = { var done = false var result: Option[BlockInfo] = None - while(!done) { + while (!done) { val wrapper = blockInfoWrappers.get(blockId) if (wrapper == null) { done = true diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 06857dff99c1..d99bc5bf3054 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1516,7 +1516,7 @@ private[spark] class BlockManager( return true } - if(master.isRDDBlockVisible(blockId)) { + if (master.isRDDBlockVisible(blockId)) { // Cache the visibility status if block exists. blockInfoManager.tryMarkBlockAsVisible(blockId) true @@ -1882,7 +1882,7 @@ private[spark] class BlockManager( blockId, numPeersToReplicateTo) - while(numFailures <= maxReplicationFailureCount && + while (numFailures <= maxReplicationFailureCount && peersForReplication.nonEmpty && peersReplicatedTo.size < numPeersToReplicateTo) { val peer = peersForReplication.head diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 6bb5058f5ed1..7245d87a8bab 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -45,7 +45,7 @@ class BitSet(numBits: Int) extends Serializable { def setUntil(bitIndex: Int): Unit = { val wordIndex = bitIndex >> 6 // divide by 64 Arrays.fill(words, 0, wordIndex, -1) - if(wordIndex < words.length) { + if (wordIndex < words.length) { // Set the remaining bits (note that the mask could still be zero) val mask = ~(-1L << (bitIndex & 0x3f)) words(wordIndex) |= mask @@ -58,7 +58,7 @@ class BitSet(numBits: Int) extends Serializable { def clearUntil(bitIndex: Int): Unit = { val wordIndex = bitIndex >> 6 // divide by 64 Arrays.fill(words, 0, wordIndex, 0) - if(wordIndex < words.length) { + if (wordIndex < words.length) { // Clear the remaining bits val mask = -1L << (bitIndex & 0x3f) words(wordIndex) &= mask @@ -75,7 +75,7 @@ class BitSet(numBits: Int) extends Serializable { assert(newBS.numWords >= numWords) assert(newBS.numWords >= other.numWords) var ind = 0 - while( ind < smaller ) { + while (ind < smaller) { newBS.words(ind) = words(ind) & other.words(ind) ind += 1 } @@ -92,15 +92,15 @@ class BitSet(numBits: Int) extends Serializable { assert(newBS.numWords >= other.numWords) val smaller = math.min(numWords, other.numWords) var ind = 0 - while( ind < smaller ) { + while (ind < smaller) { newBS.words(ind) = words(ind) | other.words(ind) ind += 1 } - while( ind < numWords ) { + while (ind < numWords) { newBS.words(ind) = words(ind) ind += 1 } - while( ind < other.numWords ) { + while (ind < other.numWords) { newBS.words(ind) = other.words(ind) ind += 1 } @@ -242,7 +242,7 @@ class BitSet(numBits: Int) extends Serializable { def union(other: BitSet): Unit = { require(this.numWords <= other.numWords) var ind = 0 - while( ind < this.numWords ) { + while (ind < this.numWords) { this.words(ind) = this.words(ind) | other.words(ind) ind += 1 } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 8bb96a0f53c7..aecb8b99d0e3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -792,7 +792,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("randomSplit") { val n = 600 val data = sc.parallelize(1 to n, 2) - for(seed <- 1 to 5) { + for (seed <- 1 to 5) { val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed) assert(splits.length == 3, "wrong number of splits") assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala index 9ce4acc75ec4..8f15e8cf1941 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala @@ -128,7 +128,7 @@ class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplication assert(prioritizedPeers.toSet.size == numReplicas) val priorityPeers = prioritizedPeers.take(2) assert(priorityPeers.forall(p => p.host != blockManager.host)) - if(numReplicas > 1) { + if (numReplicas > 1) { // both these conditions should be satisfied when numReplicas > 1 assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo)) assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo)) diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index f649c310b34a..370c6fcd434c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -90,7 +90,7 @@ object LocalKMeans { println(s"Initial centers: $kPoints") - while(tempDist > convergeDist) { + while (tempDist > convergeDist) { val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) val mappings = closest.groupBy[Int] (x => x._1) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index a6e1de7f5f2a..a0cd91a582dc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -79,7 +79,7 @@ object SparkKMeans { val kPoints = data.takeSample(withReplacement = false, K, 42) var tempDist = 1.0 - while(tempDist > convergeDist) { + while (tempDist > convergeDist) { val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) val pointStats = closest.reduceByKey(mergeResults) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala index ea794c700ae7..8659e37c6a7f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala @@ -40,7 +40,7 @@ object Word2VecExample { val synonyms = model.findSynonyms("1", 5) - for((synonym, cosineSimilarity) <- synonyms) { + for ((synonym, cosineSimilarity) <- synonyms) { println(s"$synonym $cosineSimilarity") } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index c88a90768d7f..59d0641b62ce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -87,7 +87,7 @@ class CustomReceiver(host: String, port: Int) val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() - while(!isStopped() && userInput != null) { + while (!isStopped() && userInput != null) { store(userInput) userInput = reader.readLine() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala index 1565447af7b9..7caf36e85c79 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala @@ -121,7 +121,7 @@ class MultilabelMetrics @Since("1.2.0") (predictionAndLabels: RDD[(Array[Double] def f1Measure(label: Double): Double = { val p = precision(label) val r = recall(label) - if((p + r) == 0) 0.0 else 2 * p * r / (p + r) + if ((p + r) == 0) 0.0 else 2 * p * r / (p + r) } private lazy val sumTp = summary.tpPerClass.values.sum diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 465c5e605b8c..1bef7e50c046 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -70,7 +70,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) override def onClose(e: WatcherException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - if(e != null && e.isHttpGone) { + if (e != null && e.isHttpGone) { resourceTooOldReceived = true logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") } else { @@ -108,7 +108,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) } } - if(podCompleted) { + if (podCompleted) { logInfo( pod.map { p => log"Container final statuses:\n\n${MDC(STATUS, containersDescription(p))}" } .getOrElse(log"No containers were found in the driver pod.")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 4dfd4cf7ca4f..ec01fe3ddc37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -85,7 +85,7 @@ object ExternalCatalogUtils { } else { val sb = new java.lang.StringBuilder(length + 16) if (firstIndex != 0) sb.append(path, 0, firstIndex) - while(firstIndex < length) { + while (firstIndex < length) { val c = path.charAt(firstIndex) if (needsEscaping(c)) { sb.append('%').append(HEX_CHARS((c & 0xF0) >> 4)).append(HEX_CHARS(c & 0x0F)) @@ -110,7 +110,7 @@ object ExternalCatalogUtils { } else { val sb = new java.lang.StringBuilder(length) var plaintextStartIdx = 0 - while(plaintextEndIdx != -1 && plaintextEndIdx + 2 < length) { + while (plaintextEndIdx != -1 && plaintextEndIdx + 2 < length) { if (plaintextEndIdx > plaintextStartIdx) sb.append(path, plaintextStartIdx, plaintextEndIdx) val high = path.charAt(plaintextEndIdx + 1) if ((high >>> 8) == 0 && unhexDigits(high) != -1) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index a9bbda56e870..284ca63d820f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -509,7 +509,7 @@ class InMemoryCatalog( try { val fs = tablePath.getFileSystem(hadoopConfig) fs.mkdirs(newPartPath) - if(!fs.rename(oldPartPath, newPartPath)) { + if (!fs.rename(oldPartPath, newPartPath)) { throw new IOException(s"Renaming partition path from $oldPartPath to " + s"$newPartPath returned false") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 0df5a7019893..b1e3a4ad21e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -1026,7 +1026,7 @@ case class Cast( try doubleStr.toDouble catch { case _: NumberFormatException => val d = Cast.processFloatingPointSpecialLiterals(doubleStr, false) - if(ansiEnabled && d == null) { + if (ansiEnabled && d == null) { throw QueryExecutionErrors.invalidInputInCastToNumberError( DoubleType, s, getContextOrNull()) } else { @@ -1602,7 +1602,7 @@ case class Cast( val util = IntervalUtils.getClass.getCanonicalName.stripSuffix("$") (c, evPrim, evNull) => code"""$evPrim = $util.safeStringToInterval($c); - if(${evPrim} == null) { + if (${evPrim} == null) { ${evNull} = true; } """.stripMargin diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index 56ecbf550e45..609d457fbd06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -337,11 +337,11 @@ case class CaseWhen( // This generates code like: // caseWhenResultState = caseWhen_1(i); - // if(caseWhenResultState != -1) { + // if (caseWhenResultState != -1) { // continue; // } // caseWhenResultState = caseWhen_2(i); - // if(caseWhenResultState != -1) { + // if (caseWhenResultState != -1) { // continue; // } // ... diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index 80bcf156133e..7635690a4605 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -1124,7 +1124,7 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { val hashMap = new mutable.LinkedHashMap[Any, Array[Option[Int]]] - for((z, array) <- Array((0, keys1), (1, keys2))) { + for ((z, array) <- Array((0, keys1), (1, keys2))) { var i = 0 while (i < array.numElements()) { val key = array.get(i, keyType) @@ -1146,7 +1146,7 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = { val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] - for((z, array) <- Array((0, keys1), (1, keys2))) { + for ((z, array) <- Array((0, keys1), (1, keys2))) { var i = 0 while (i < array.numElements()) { val key = array.get(i, keyType) @@ -1156,7 +1156,7 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) val (bufferKey, indexes) = arrayBuffer(j) if (ordering.equiv(bufferKey, key)) { found = true - if(indexes(z).isEmpty) { + if (indexes(z).isEmpty) { indexes(z) = Some(i) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 1d4b66557e47..77c76e6f5e34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -1695,7 +1695,7 @@ case class ExternalMapToCatalyst private( final Object[] $convertedValues = new Object[$length]; int $index = 0; $defineEntries - while($entries.hasNext()) { + while ($entries.hasNext()) { $defineKeyValue $keyNullCheck $valueNullCheck diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index 8f520fca4350..be7d227d8002 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -77,7 +77,7 @@ abstract class StringRegexExpression extends BinaryExpression protected override def nullSafeEval(input1: Any, input2: Any): Any = { val regex = pattern(input2.asInstanceOf[UTF8String].toString) - if(regex == null) { + if (regex == null) { null } else { matches(regex, input1.asInstanceOf[UTF8String].toString) @@ -984,7 +984,7 @@ case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: Expres override def nullSafeEval(s: Any, p: Any, r: Any): Any = { val m = getLastMatcher(s, p) val matchResults = new ArrayBuffer[UTF8String]() - while(m.find) { + while (m.find) { val mr: MatchResult = m.toMatchResult val index = r.asInstanceOf[Int] RegExpExtractBase.checkGroupIndex(prettyName, mr.groupCount, index) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index 10646130a910..a4672f9cd9f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -191,7 +191,7 @@ case class JoinEstimation(join: Join) extends Logging { } val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]() var i = 0 - while(i < keyPairs.length && joinCard != 0) { + while (i < keyPairs.length && joinCard != 0) { val (leftKey, rightKey) = keyPairs(i) // Check if the two sides are disjoint val leftKeyStat = leftStats.attributeStats(leftKey) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index b5556cbae7cd..5ae2ca0d532b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -176,7 +176,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison" :: Nil } - if(Utils.isTesting) { + if (Utils.isTesting) { rulesNeedingIds = rulesNeedingIds ++ { // In the production code path, the following rules are run in CombinedTypeCoercionRule, and // hence we only need to add them for unit testing. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala index 8c1c50d970ec..d3401613dcd8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala @@ -188,7 +188,7 @@ class PercentileSuite extends SparkFunSuite { StringType, DateType, TimestampType, CalendarIntervalType, NullType) - for(dataType <- invalidDataTypes; + for (dataType <- invalidDataTypes; frequencyType <- validFrequencyTypes) { val child = AttributeReference("a", dataType)() val frq = AttributeReference("frq", frequencyType)() @@ -207,7 +207,7 @@ class PercentileSuite extends SparkFunSuite { ) } - for(dataType <- validDataTypes; + for (dataType <- validDataTypes; frequencyType <- invalidFrequencyDataTypes) { val child = AttributeReference("a", dataType)() val frq = AttributeReference("frq", frequencyType)() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala index 93afef60a9dd..e93ec751a7f8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala @@ -244,7 +244,7 @@ class ParserUtilsSuite extends SparkFunSuite { Some(context) case _ => val it = ctx.children.iterator() - while(it.hasNext) { + while (it.hasNext) { it.next() match { case p: ParserRuleContext => val childResult = findCastContext(p) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala index e2d8ac898804..40112979c6d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala @@ -272,13 +272,13 @@ trait AggregateCodegenSupport aggCodes.zip(aggregateExpressions.map(ae => (ae.mode, ae.filter))).map { case (aggCode, (Partial | Complete, Some(condition))) => - // Note: wrap in "do { } while(false);", so the generated checks can jump out + // Note: wrap in "do { } while (false);", so the generated checks can jump out // with "continue;" s""" |do { | ${generatePredicateCode(ctx, condition, inputAttrs, input)} | $aggCode - |} while(false); + |} while (false); """.stripMargin case (aggCode, _) => aggCode diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 083858e4fe80..995f857bbf63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -261,13 +261,13 @@ case class FilterExec(condition: Expression, child: SparkPlan) ev } - // Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;" + // Note: wrap in "do { } while (false);", so the generated checks can jump out with "continue;" s""" |do { | $predicateCode | $numOutput.add(1); | ${consume(ctx, resultVars)} - |} while(false); + |} while (false); """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2f77b2c14b00..ae06e82335b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -603,7 +603,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with input = decompressStream(sourceStream) var eof = false - while(!eof) { + while (!eof) { val keySize = input.readInt() if (keySize == -1) { eof = true @@ -656,7 +656,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with rawOutput = fm.createAtomic(targetFile, overwriteIfPossible = true) output = compressStream(rawOutput) val iter = map.iterator() - while(iter.hasNext) { + while (iter.hasNext) { val entry = iter.next() val keyBytes = entry.key.getBytes() val valueBytes = entry.value.getBytes() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 8f800b9f0252..20d8302acd49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -742,7 +742,7 @@ case class StateStoreSaveExec( new NextIterator[InternalRow] { override protected def getNext(): InternalRow = { var removedValueRow: InternalRow = null - while(rangeIter.hasNext && removedValueRow == null) { + while (rangeIter.hasNext && removedValueRow == null) { val rowPair = rangeIter.next() if (watermarkPredicateForKeysForEviction.get.eval(rowPair.key)) { stateManager.remove(store, rowPair.key) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index de8cf7a7b2d7..1cf342a45056 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -107,7 +107,7 @@ class SimpleWritableDataSource extends TestingV2Source { try { for (file <- fs.listStatus(jobPath).map(_.getPath)) { val dest = new Path(finalPath, file.getName) - if(!fs.rename(file, dest)) { + if (!fs.rename(file, dest)) { throw new IOException(s"failed to rename($file, $dest)") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala index eea2cb01ab46..7965d5c3c17a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala @@ -88,7 +88,7 @@ class SortBasedAggregationStoreSuite extends SparkFunSuite with LocalSparkConte } val iter = store.destructiveIterator() - while(iter.hasNext) { + while (iter.hasNext) { val agg = iter.next() assert(agg.aggregationBuffer.getInt(0) == expected(agg.groupingKey.getInt(0))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dfbc8e5279aa..d79709317c3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -490,7 +490,7 @@ abstract class JsonSuite // The following tests are about type coercion instead of JSON data source. // Here we simply forcus on the behavior of non-Ansi. - if(!SQLConf.get.ansiEnabled) { + if (!SQLConf.get.ansiEnabled) { // Number and Boolean conflict: resolve the type as number in this query. checkAnswer( sql("select num_bool - 10 from jsonTable where num_bool > 11"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala index 17dc70df42a6..fd81dcfe24d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala @@ -132,7 +132,7 @@ class ParquetDeltaLengthByteArrayEncodingSuite val samples: Array[String] = new Array[String](numSamples) for (i <- 0 until numSamples) { var maxLen: Int = randomLen.nextInt(maxLength) - if(randomEmpty.nextInt() % 11 != 0) { + if (randomEmpty.nextInt() % 11 != 0) { maxLen = 0; } samples(i) = RandomStringUtils.randomAlphanumeric(0, maxLen) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 68436c4e355b..6d9731fa63b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -867,7 +867,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with (1 to iterations).foreach { i => val rand = Random.nextDouble() - if(!running) { + if (!running) { rand match { case r if r < 0.7 => // AddData addRandomData() @@ -895,7 +895,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with } } } - if(!running) { actions += StartStream() } + if (!running) { actions += StartStream() } addCheck() testStream(ds)(actions.toSeq: _*) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 94f1e53371b9..969b1da6cd4d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -35,7 +35,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { test("Spark's own GetSchemasOperation(SparkGetSchemasOperation)") { def checkResult(rs: ResultSet, dbNames: Seq[String]): Unit = { val expected = dbNames.iterator - while(rs.next() || expected.hasNext) { + while (rs.next() || expected.hasNext) { assert(rs.getString("TABLE_SCHEM") === expected.next()) assert(rs.getString("TABLE_CATALOG").isEmpty) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 194572be2a64..db2835263577 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1108,7 +1108,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val actualPartitionPath = new Path(currentFullPath, actualPartitionString) try { fs.mkdirs(expectedPartitionPath) - if(!fs.rename(actualPartitionPath, expectedPartitionPath)) { + if (!fs.rename(actualPartitionPath, expectedPartitionPath)) { throw new IOException(s"Renaming partition path from $actualPartitionPath to " + s"$expectedPartitionPath returned false") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 34b079219c99..83f36b760db3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -88,7 +88,7 @@ class SocketReceiver[T: ClassTag]( def receive(): Unit = { try { val iterator = bytesToObjects(socket.getInputStream()) - while(!isStopped() && iterator.hasNext) { + while (!isStopped() && iterator.hasNext) { store(iterator.next()) } if (!isStopped()) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1dde435a913c..856695ac2c98 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -133,7 +133,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { + while (!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { Thread.sleep(pollTime) } logInfo("Waited for all received blocks to be consumed for job generation") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 3263f12a4e1e..b6439262981a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -67,7 +67,7 @@ object RawTextHelper { var swap: (String, Long) = null var count = 0 - while(data.hasNext) { + while (data.hasNext) { value = data.next() if (value != null) { count += 1 @@ -80,7 +80,7 @@ object RawTextHelper { } taken(len - 1) = value i = len - 1 - while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + while (i > 0 && taken(i - 1)._2 < taken(i)._2) { swap = taken(i) taken(i) = taken(i-1) taken(i - 1) = swap diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala index 8069e7915b1d..c125bd13cd35 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala @@ -244,7 +244,7 @@ private[streaming] class OpenHashMapBasedStateMap[K, S]( // allocate appropriately sized OpenHashMap. outputStream.writeInt(approxSize) - while(iterOfActiveSessions.hasNext) { + while (iterOfActiveSessions.hasNext) { parentSessionCount += 1 val (key, state, updateTime) = iterOfActiveSessions.next() @@ -294,7 +294,7 @@ private[streaming] class OpenHashMapBasedStateMap[K, S]( // Read the records until the limit marking object has been reached var parentSessionLoopDone = false - while(!parentSessionLoopDone) { + while (!parentSessionLoopDone) { val obj = inputStream.readObject() obj match { case marker: LimitMarker => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 2dc43a231d9b..793730ef0b3a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -218,7 +218,7 @@ object MasterFailureTest extends Logging { val checkpointDir = ssc.checkpointDir val batchDuration = ssc.graph.batchDuration - while(!isLastOutputGenerated && !isTimedOut) { + while (!isLastOutputGenerated && !isTimedOut) { // Get the output buffer val outputQueue = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output def output = outputQueue.asScala.flatten diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 75edbb173faa..21c35b551239 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -154,7 +154,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { val startTimeNs = System.nanoTime() blockGenerator.start() var count = 0 - while(System.nanoTime() - startTimeNs < TimeUnit.MILLISECONDS.toNanos(waitTime)) { + while (System.nanoTime() - startTimeNs < TimeUnit.MILLISECONDS.toNanos(waitTime)) { blockGenerator.addData(count) generatedData += count count += 1 @@ -389,7 +389,7 @@ class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel receiving = true try { var count = 0 - while(!isStopped()) { + while (!isStopped()) { if (sendData) { store(count) count += 1 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala index d0c56ecffcb8..a245753bece2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala @@ -160,7 +160,7 @@ class StateMapSuite extends SparkFunSuite { deltaChainThreshold = deltaChainThreshold) // Make large delta chain with length more than deltaChainThreshold - for(i <- 1 to targetDeltaLength) { + for (i <- 1 to targetDeltaLength) { map.put(Random.nextInt(), Random.nextInt(), 1) map = map.copy().asInstanceOf[OpenHashMapBasedStateMap[Int, Int]] } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a45d3b4bc8a9..b2d060b8e042 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -958,7 +958,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) val thread = new Thread() { override def run(): Unit = { logInfo("Receiving started") - for(i <- 1 to totalRecords) { + for (i <- 1 to totalRecords) { Thread.sleep(1000 / recordsPerSecond) store(i) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 01177f1cca70..1fbee8f09b56 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -163,7 +163,7 @@ class StreamingJobProgressListenerSuite val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) - for(_ <- 0 until (limit + 10)) { + for (_ <- 0 until (limit + 10)) { listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } @@ -177,7 +177,7 @@ class StreamingJobProgressListenerSuite val listener = new StreamingJobProgressListener(ssc) // fulfill completedBatchInfos - for(i <- 0 until limit) { + for (i <- 0 until limit) { val batchInfoCompleted = BatchInfo( Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) @@ -205,13 +205,13 @@ class StreamingJobProgressListenerSuite batchUIData.get.outputOpIdSparkJobIdPairs.toSeq should be (Seq(OutputOpIdAndSparkJobId(0, 0))) // A lot of "onBatchCompleted"s happen before "onJobStart" - for(i <- limit + 1 to limit * 2) { + for (i <- limit + 1 to limit * 2) { val batchInfoCompleted = BatchInfo( Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } - for(i <- limit + 1 to limit * 2) { + for (i <- limit + 1 to limit * 2) { val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) listener.onJobStart(jobStart) }