Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
/* backlog */ 1,
InetAddress.getByName("localhost")))
// A call to accept() for ServerSocket shall block infinitely.
serverSocket.map(_.setSoTimeout(0))
serverSocket.foreach(_.setSoTimeout(0))
new Thread("accept-connections") {
setDaemon(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
dataFileWriter.create(schema, new File(avroFile))
val logicalType = LogicalTypes.decimal(precision, scale)

decimalInputData.map { x =>
decimalInputData.foreach { x =>
val avroRec = new GenericData.Record(schema)
val decimal = new java.math.BigDecimal(x).setScale(scale)
val bytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[spark] object MesosSchedulerBackendUtil extends Logging {
val containerInfo = ContainerInfo.newBuilder()
.setType(containerType)

conf.get(EXECUTOR_DOCKER_IMAGE).map { image =>
conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image =>
val forcePullImage = conf
.get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private[yarn] class YarnAllocator(
private def getPendingAtLocation(
location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
rpIdToResourceProfile.keys.map { id =>
rpIdToResourceProfile.keys.foreach { id =>
val profResource = rpIdToYarnResource.get(id)
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
.asScala.flatMap(_.asScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
assertFailOnGlobalWatermarkLimit(
s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
streamRelation.join(
Expand Down Expand Up @@ -718,7 +718,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
assertPassOnGlobalWatermarkLimit(
s"$joinType join after deduplicate in Append mode",
streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
// Remove the cache entry before creating a new ones.
cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
}
needToRecache.map { cd =>
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
spark, forceDisableConfigs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec(
override protected def run(): Seq[InternalRow] = {
// The catalog is updated first because CatalogManager resets the current namespace
// when the current catalog is set.
catalogName.map(catalogManager.setCurrentCatalog)
namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray))
catalogName.foreach(catalogManager.setCurrentCatalog)
namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray))

Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
var tableComment: String = ""
var tableProperties: String = ""
if (!properties.isEmpty) {
properties.asScala.map {
properties.asScala.foreach {
case (k, v) => k match {
case TableCatalog.PROP_COMMENT => tableComment = v
case TableCatalog.PROP_PROVIDER =>
Expand Down Expand Up @@ -226,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case Array(db) if !namespaceExists(namespace) =>
var comment = ""
if (!metadata.isEmpty) {
metadata.asScala.map {
metadata.asScala.foreach {
case (k, v) => k match {
case SupportsNamespaces.PROP_COMMENT => comment = v
case SupportsNamespaces.PROP_OWNER => // ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite {

val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough)

input.map { value =>
input.foreach { value =>
val row = new GenericInternalRow(1)
columnType.setField(row, 0, value)
builder.appendFrom(row, 0)
Expand Down Expand Up @@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite {
val row = new GenericInternalRow(1)
val nullRow = new GenericInternalRow(1)
nullRow.setNullAt(0)
input.map { value =>
input.foreach { value =>
if (value == nullValue) {
builder.appendFrom(nullRow, 0)
} else {
Expand Down