Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
/* backlog */ 1,
InetAddress.getByName("localhost")))
// A call to accept() for ServerSocket shall block infinitely.
serverSocket.map(_.setSoTimeout(0))
serverSocket.foreach(_.setSoTimeout(0))
new Thread("accept-connections") {
setDaemon(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
dataFileWriter.create(schema, new File(avroFile))
val logicalType = LogicalTypes.decimal(precision, scale)

decimalInputData.map { x =>
decimalInputData.foreach { x =>
val avroRec = new GenericData.Record(schema)
val decimal = new java.math.BigDecimal(x).setScale(scale)
val bytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
val containerInfo = ContainerInfo.newBuilder()
.setType(containerType)

conf.get(EXECUTOR_DOCKER_IMAGE).map { image =>
conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image =>
val forcePullImage = conf
.get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
OutputMode.Append(),
expectFailure = expectFailure)

Seq(Inner, LeftOuter, RightOuter).map { joinType2 =>
Seq(Inner, LeftOuter, RightOuter).foreach { joinType2 =>
testGlobalWatermarkLimit(
s"streaming-stream $joinType2 after stream-stream $joinType join in Append mode",
streamRelation.join(
Expand Down Expand Up @@ -740,7 +740,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
assertFailOnGlobalWatermarkLimit(
s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
streamRelation.join(
Expand Down Expand Up @@ -773,7 +773,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
OutputMode.Append())

Seq(Inner, LeftOuter, RightOuter).map { joinType =>
Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
assertPassOnGlobalWatermarkLimit(
s"$joinType join after deduplicate in Append mode",
streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
// Remove the cache entry before creating a new ones.
cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
}
needToRecache.map { cd =>
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
// Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged.
val sessionWithAqeOff = getOrCloneSessionWithAqeOff(spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec(
override protected def run(): Seq[InternalRow] = {
// The catalog is updated first because CatalogManager resets the current namespace
// when the current catalog is set.
catalogName.map(catalogManager.setCurrentCatalog)
namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray))
catalogName.foreach(catalogManager.setCurrentCatalog)
namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray))

Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite {

val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough)

input.map { value =>
input.foreach { value =>
val row = new GenericInternalRow(1)
columnType.setField(row, 0, value)
builder.appendFrom(row, 0)
Expand Down Expand Up @@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite {
val row = new GenericInternalRow(1)
val nullRow = new GenericInternalRow(1)
nullRow.setNullAt(0)
input.map { value =>
input.foreach { value =>
if (value == nullValue) {
builder.appendFrom(nullRow, 0)
} else {
Expand Down