diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f49cb3c2b8836..8d9f2be6218e6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -244,7 +244,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( /* backlog */ 1, InetAddress.getByName("localhost"))) // A call to accept() for ServerSocket shall block infinitely. - serverSocket.map(_.setSoTimeout(0)) + serverSocket.foreach(_.setSoTimeout(0)) new Thread("accept-connections") { setDaemon(true) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 82569653c1f23..12ebddf72b03d 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { dataFileWriter.create(schema, new File(avroFile)) val logicalType = LogicalTypes.decimal(precision, scale) - decimalInputData.map { x => + decimalInputData.foreach { x => val avroRec = new GenericData.Record(schema) val decimal = new java.math.BigDecimal(x).setScale(scale) val bytes = diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index a5a2611be3765..e9fbff730f07e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -138,7 +138,7 @@ private[spark] object MesosSchedulerBackendUtil extends Logging { val containerInfo = ContainerInfo.newBuilder() .setType(containerType) - conf.get(EXECUTOR_DOCKER_IMAGE).map { image => + conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image => val forcePullImage = conf .get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ac50c1c77a24e..f236fc39f61fa 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -263,7 +263,7 @@ private[yarn] class YarnAllocator( private def getPendingAtLocation( location: String): Map[Int, Seq[ContainerRequest]] = synchronized { val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]] - rpIdToResourceProfile.keys.map { id => + rpIdToResourceProfile.keys.foreach { id => val profResource = rpIdToYarnResource.get(id) val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource) .asScala.flatMap(_.asScala) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index cdc3f4275414c..68536bccf7b2b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -685,7 +685,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertFailOnGlobalWatermarkLimit( s"stream-stream $joinType after FlatMapGroupsWithState in Append mode", streamRelation.join( @@ -718,7 +718,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertPassOnGlobalWatermarkLimit( s"$joinType join after deduplicate in Append mode", streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f163d85914bc9..fddd12614e254 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -199,7 +199,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // Remove the cache entry before creating a new ones. cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) } - needToRecache.map { cd => + needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff( spark, forceDisableConfigs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala index 9e6f00e0923ea..b13cea266707b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala @@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec( override protected def run(): Seq[InternalRow] = { // The catalog is updated first because CatalogManager resets the current namespace // when the current catalog is set. - catalogName.map(catalogManager.setCurrentCatalog) - namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray)) + catalogName.foreach(catalogManager.setCurrentCatalog) + namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray)) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 27558e5b0d61b..c4258fa264fdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -127,7 +127,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging var tableComment: String = "" var tableProperties: String = "" if (!properties.isEmpty) { - properties.asScala.map { + properties.asScala.foreach { case (k, v) => k match { case TableCatalog.PROP_COMMENT => tableComment = v case TableCatalog.PROP_PROVIDER => @@ -226,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case Array(db) if !namespaceExists(namespace) => var comment = "" if (!metadata.isEmpty) { - metadata.asScala.map { + metadata.asScala.foreach { case (k, v) => k match { case SupportsNamespaces.PROP_COMMENT => comment = v case SupportsNamespaces.PROP_OWNER => // ignore diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala index f946a6779ec95..c6fe64d1058ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala @@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite { val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) - input.map { value => + input.foreach { value => val row = new GenericInternalRow(1) columnType.setField(row, 0, value) builder.appendFrom(row, 0) @@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite { val row = new GenericInternalRow(1) val nullRow = new GenericInternalRow(1) nullRow.setNullAt(0) - input.map { value => + input.foreach { value => if (value == nullValue) { builder.appendFrom(nullRow, 0) } else {