diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 39c4abea933a7..820cb798050f0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -230,7 +230,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( /* backlog */ 1, InetAddress.getByName("localhost"))) // A call to accept() for ServerSocket shall block infinitely. - serverSocket.map(_.setSoTimeout(0)) + serverSocket.foreach(_.setSoTimeout(0)) new Thread("accept-connections") { setDaemon(true) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 82569653c1f23..12ebddf72b03d 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { dataFileWriter.create(schema, new File(avroFile)) val logicalType = LogicalTypes.decimal(precision, scale) - decimalInputData.map { x => + decimalInputData.foreach { x => val avroRec = new GenericData.Record(schema) val decimal = new java.math.BigDecimal(x).setScale(scale) val bytes = diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 7b2f6a2535eda..402d80f6dd2fb 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -137,7 +137,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { val containerInfo = ContainerInfo.newBuilder() .setType(containerType) - conf.get(EXECUTOR_DOCKER_IMAGE).map { image => + conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image => val forcePullImage = conf .get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 3ec6fdeedd4b8..c3bb7bfc80fcf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -694,7 +694,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { OutputMode.Append(), expectFailure = expectFailure) - Seq(Inner, LeftOuter, RightOuter).map { joinType2 => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType2 => testGlobalWatermarkLimit( s"streaming-stream $joinType2 after stream-stream $joinType join in Append mode", streamRelation.join( @@ -740,7 +740,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertFailOnGlobalWatermarkLimit( s"stream-stream $joinType after FlatMapGroupsWithState in Append mode", streamRelation.join( @@ -773,7 +773,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertPassOnGlobalWatermarkLimit( s"$joinType join after deduplicate in Append mode", streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 7d86c48015406..cee8585e387a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -189,7 +189,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // Remove the cache entry before creating a new ones. cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) } - needToRecache.map { cd => + needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() // Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged. val sessionWithAqeOff = getOrCloneSessionWithAqeOff(spark) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala index 9e6f00e0923ea..b13cea266707b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala @@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec( override protected def run(): Seq[InternalRow] = { // The catalog is updated first because CatalogManager resets the current namespace // when the current catalog is set. - catalogName.map(catalogManager.setCurrentCatalog) - namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray)) + catalogName.foreach(catalogManager.setCurrentCatalog) + namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray)) Seq.empty } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala index f946a6779ec95..c6fe64d1058ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala @@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite { val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) - input.map { value => + input.foreach { value => val row = new GenericInternalRow(1) columnType.setField(row, 0, value) builder.appendFrom(row, 0) @@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite { val row = new GenericInternalRow(1) val nullRow = new GenericInternalRow(1) nullRow.setNullAt(0) - input.map { value => + input.foreach { value => if (value == nullValue) { builder.appendFrom(nullRow, 0) } else {