From bebeea47d8ff516663b7cc79e146ee11b99da373 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 11:50:59 -0700 Subject: [PATCH 01/17] add api --- .../spark/api/shuffle/ShuffleDataCleaner.java | 26 ++++++++++++++++++ .../api/shuffle/ShuffleDriverComponents.java | 27 +++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java new file mode 100644 index 0000000000000..bb2b7ebd2540a --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; + +public interface ShuffleDataCleaner { + + void removeShuffleData(int shuffleId) throws IOException; + +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java new file mode 100644 index 0000000000000..ecb91525d6218 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +public interface ShuffleDriverComponents { + + void initializeApplication(); + + void cleanupApplication(); + + ShuffleDataCleaner dataCleaner(); +} From f6e84de52dc586132a7cec0e51d0e01056491b4e Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 17:41:12 -0700 Subject: [PATCH 02/17] initial impl --- .../shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java | 4 ++++ .../sort/lifecycle/DefaultShuffleDriverComponents.java | 4 ++++ 2 files changed, 8 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java new file mode 100644 index 0000000000000..362d386301f28 --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -0,0 +1,4 @@ +package org.apache.spark.shuffle.sort.lifecycle; + +public class DefaultShuffleDataCleaner { +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java new file mode 100644 index 0000000000000..e23ff0dc2acbc --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -0,0 +1,4 @@ +package org.apache.spark.shuffle.sort.lifecycle; + +public class DefaultShuffleDriverComponents { +} From 6f41113893281a5271854bc55c838b69f8cbe712 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 18:02:49 -0700 Subject: [PATCH 03/17] initial impl --- .../spark/api/shuffle/ShuffleDataIO.java | 1 + .../shuffle/sort/io/DefaultShuffleDataIO.java | 7 ++++ .../lifecycle/DefaultShuffleDataCleaner.java | 30 ++++++++++++++- .../DefaultShuffleDriverComponents.java | 38 ++++++++++++++++++- .../org/apache/spark/ContextCleaner.scala | 25 ++++-------- .../scala/org/apache/spark/SparkContext.scala | 10 ++++- .../apache/spark/ContextCleanerSuite.scala | 2 +- .../spark/InternalAccumulatorSuite.scala | 6 ++- 8 files changed, 95 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java index 4cb40f6dd00b8..ecf1c32660823 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -28,4 +28,5 @@ @Experimental public interface ShuffleDataIO { ShuffleExecutorComponents executor(); + ShuffleDriverComponents driver(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java index 906600c0f15fc..41b2637f690e2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java @@ -18,8 +18,10 @@ package org.apache.spark.shuffle.sort.io; import org.apache.spark.SparkConf; +import org.apache.spark.api.shuffle.ShuffleDriverComponents; import org.apache.spark.api.shuffle.ShuffleExecutorComponents; import org.apache.spark.api.shuffle.ShuffleDataIO; +import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDriverComponents; public class DefaultShuffleDataIO implements ShuffleDataIO { @@ -33,4 +35,9 @@ public DefaultShuffleDataIO(SparkConf sparkConf) { public ShuffleExecutorComponents executor() { return new DefaultShuffleExecutorComponents(sparkConf); } + + @Override + public ShuffleDriverComponents driver() { + return new DefaultShuffleDriverComponents(sparkConf); + } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java index 362d386301f28..b9d51b88c56d8 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -1,4 +1,32 @@ package org.apache.spark.shuffle.sort.lifecycle; -public class DefaultShuffleDataCleaner { +import org.apache.spark.api.shuffle.ShuffleDataCleaner; +import org.apache.spark.storage.BlockManagerMaster; + +import java.io.IOException; + +public class DefaultShuffleDataCleaner implements ShuffleDataCleaner { + + private final BlockManagerMaster blockManagerMaster; + + /** + * Whether the cleaning thread will block on shuffle cleanup tasks. + * + * When context cleaner is configured to block on every delete request, it can throw timeout + * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this + * parameter by default disables blocking on shuffle cleanups. Note that this does not affect + * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, + * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is + * resolved. + */ + private final boolean blockOnShuffleCleanup; + + public DefaultShuffleDataCleaner(BlockManagerMaster blockManagerMaster, boolean blockOnShuffleCleanup) { + this.blockManagerMaster = blockManagerMaster; + this.blockOnShuffleCleanup = blockOnShuffleCleanup; + } + @Override + public void removeShuffleData(int shuffleId) throws IOException { + blockManagerMaster.removeShuffle(shuffleId, blockOnShuffleCleanup); + } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index e23ff0dc2acbc..934e86eb5233a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -1,4 +1,40 @@ package org.apache.spark.shuffle.sort.lifecycle; -public class DefaultShuffleDriverComponents { +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; +import org.apache.spark.api.shuffle.ShuffleDataCleaner; +import org.apache.spark.api.shuffle.ShuffleDriverComponents; +import org.apache.spark.internal.config.package$; +import org.apache.spark.storage.BlockManagerMaster; + +public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { + + private final boolean blockOnShuffleCleanup; + private BlockManagerMaster blockManagerMaster; + + public DefaultShuffleDriverComponents(SparkConf sparkConf) { + this.blockOnShuffleCleanup = (boolean) sparkConf.get(package$.MODULE$.CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE()); + } + + @Override + public void initializeApplication() { + blockManagerMaster = SparkEnv.get().blockManager().master(); + } + + @Override + public void cleanupApplication() { + // do nothing + } + + @Override + public ShuffleDataCleaner dataCleaner() { + checkInitialized(); + return new DefaultShuffleDataCleaner(blockManagerMaster, blockOnShuffleCleanup); + } + + private void checkInitialized() { + if (blockManagerMaster == null) { + throw new IllegalStateException("Driver components must be initialized before using"); + } + } } diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 305ec46a364a2..641d15f1163e3 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.collection.JavaConverters._ +import org.apache.spark.api.shuffle.ShuffleDataCleaner import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -58,7 +59,9 @@ private class CleanupTaskWeakReference( * to be processed when the associated object goes out of scope of the application. Actual * cleanup is performed in a separate daemon thread. */ -private[spark] class ContextCleaner(sc: SparkContext) extends Logging { +private[spark] class ContextCleaner( + sc: SparkContext, + shuffleDataCleaner: ShuffleDataCleaner) extends Logging { /** * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they @@ -98,19 +101,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { */ private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING) - /** - * Whether the cleaning thread will block on shuffle cleanup tasks. - * - * When context cleaner is configured to block on every delete request, it can throw timeout - * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this - * parameter by default disables blocking on shuffle cleanups. Note that this does not affect - * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, - * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is - * resolved. - */ - private val blockOnShuffleCleanupTasks = - sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE) - @volatile private var stopped = false /** Attach a listener object to get information of when objects are cleaned. */ @@ -188,7 +178,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + doCleanupShuffle(shuffleId) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) case CleanAccum(accId) => @@ -218,11 +208,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Perform shuffle cleanup. */ - def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { + def doCleanupShuffle(shuffleId: Int): Unit = { try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) - blockManagerMaster.removeShuffle(shuffleId, blocking) + shuffleDataCleaner.removeShuffleData(shuffleId) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logInfo("Cleaned shuffle " + shuffleId) } catch { @@ -270,7 +260,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - private def blockManagerMaster = sc.env.blockManager.master private def broadcastManager = sc.env.broadcastManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 56c66e0e99db9..a8c3d7082ae19 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.conda.CondaEnvironment import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions +import org.apache.spark.api.shuffle.ShuffleDataIO import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{CondaRunner, LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} @@ -558,9 +559,16 @@ class SparkContext(config: SparkConf) extends SafeLogging { } _executorAllocationManager.foreach(_.start()) + val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS) + val maybeIO = Utils.loadExtensions( + classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) + require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") + val shuffleDriverComponents = maybeIO.head.driver + shuffleDriverComponents.initializeApplication() + _cleaner = if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this)) + Some(new ContextCleaner(this, shuffleDriverComponents.dataCleaner())) } else { None } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index b9b47d4dcd252..a182133b27777 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -134,7 +134,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) // Explicit cleanup - shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true)) + shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId)) tester.assertCleanup() // Verify that shuffles can be re-executed after cleaning up diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 62824a5bec9d1..517fe57ac51b9 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDataCleaner import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} @@ -206,11 +207,12 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { fail(s"unable to find internal accumulator called $TEST_ACCUM") } } - + /** * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup. */ - private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { + private class SaveAccumContextCleaner(sc: SparkContext) extends + ContextCleaner(sc, null) { private val accumsRegistered = new ArrayBuffer[Long] override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { From 0d0f368361b8dd43f73e45d749b4db6334bf74af Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 18:53:23 -0700 Subject: [PATCH 04/17] fix ups --- .../apache/spark/api/shuffle/ShuffleDriverComponents.java | 4 +++- .../shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java | 1 + .../sort/lifecycle/DefaultShuffleDriverComponents.java | 2 ++ core/src/main/scala/org/apache/spark/SparkContext.scala | 7 ++++--- .../scala/org/apache/spark/InternalAccumulatorSuite.scala | 2 +- 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java index ecb91525d6218..2625b3e0d001d 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java @@ -17,11 +17,13 @@ package org.apache.spark.api.shuffle; +import java.io.IOException; + public interface ShuffleDriverComponents { void initializeApplication(); - void cleanupApplication(); + void cleanupApplication() throws IOException; ShuffleDataCleaner dataCleaner(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java index b9d51b88c56d8..dd30b2f805771 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -25,6 +25,7 @@ public DefaultShuffleDataCleaner(BlockManagerMaster blockManagerMaster, boolean this.blockManagerMaster = blockManagerMaster; this.blockOnShuffleCleanup = blockOnShuffleCleanup; } + @Override public void removeShuffleData(int shuffleId) throws IOException { blockManagerMaster.removeShuffle(shuffleId, blockOnShuffleCleanup); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 934e86eb5233a..5d5a2c71a7987 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -7,6 +7,8 @@ import org.apache.spark.internal.config.package$; import org.apache.spark.storage.BlockManagerMaster; +import java.io.IOException; + public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private final boolean blockOnShuffleCleanup; diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a8c3d7082ae19..3ee9acbb9e80c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,9 +27,8 @@ import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap import scala.language.implicitConversions -import scala.reflect.{classTag, ClassTag} +import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal - import com.google.common.collect.MapMaker import com.palantir.logsafe.{SafeArg, UnsafeArg} import org.apache.commons.lang3.SerializationUtils @@ -43,7 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.conda.CondaEnvironment import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions -import org.apache.spark.api.shuffle.ShuffleDataIO +import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{CondaRunner, LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} @@ -216,6 +215,7 @@ class SparkContext(config: SparkConf) extends SafeLogging { private var _shutdownHookRef: AnyRef = _ private var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ + private var _shuffleDriverComponents: ShuffleDriverComponents = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -1958,6 +1958,7 @@ class SparkContext(config: SparkConf) extends SafeLogging { } _heartbeater = null } + _shuffleDriverComponents.cleanupApplication() if (env != null && _heartbeatReceiver != null) { Utils.tryLogNonFatalError { env.rpcEnv.stop(_heartbeatReceiver) diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 517fe57ac51b9..bd9e33809726f 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -207,7 +207,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { fail(s"unable to find internal accumulator called $TEST_ACCUM") } } - + /** * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup. */ From 6cb592a812b865cb76902307877848f0e2c455e8 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 19:00:47 -0700 Subject: [PATCH 05/17] scalastyle --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3ee9acbb9e80c..c21b8624d0fdb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,8 +27,9 @@ import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap import scala.language.implicitConversions -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} import scala.util.control.NonFatal + import com.google.common.collect.MapMaker import com.palantir.logsafe.{SafeArg, UnsafeArg} import org.apache.commons.lang3.SerializationUtils From 8d362b990b3ca02df20a11c518abd6963124be7d Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 5 Apr 2019 19:01:56 -0700 Subject: [PATCH 06/17] style --- .../lifecycle/DefaultShuffleDataCleaner.java | 17 +++++++++++++++++ .../DefaultShuffleDriverComponents.java | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java index dd30b2f805771..8c6d4b5e26971 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.shuffle.sort.lifecycle; import org.apache.spark.api.shuffle.ShuffleDataCleaner; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 5d5a2c71a7987..3362abedf455e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.shuffle.sort.lifecycle; import org.apache.spark.SparkConf; @@ -7,8 +24,6 @@ import org.apache.spark.internal.config.package$; import org.apache.spark.storage.BlockManagerMaster; -import java.io.IOException; - public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private final boolean blockOnShuffleCleanup; From d8194a45784eef208437ee581f586f2973b5bf51 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 10 Apr 2019 15:31:50 -0700 Subject: [PATCH 07/17] fix compile? --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 5 ++--- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 50ef4330ddc80..36eb29f33280d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1766,8 +1766,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { */ private[spark] def cleanShuffleDependencies[T]( sc: SparkContext, - deps: Seq[Dependency[_]], - blocking: Boolean = false): Unit = { + deps: Seq[Dependency[_]]): Unit = { // If there is no reference tracking we skip clean up. sc.cleaner.foreach { cleaner => /** @@ -1776,7 +1775,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { def cleanEagerly(dep: Dependency[_]): Unit = { if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) { val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId - cleaner.doCleanupShuffle(shuffleId, blocking) + cleaner.doCleanupShuffle(shuffleId) } val rdd = dep.rdd val rddDeps = rdd.dependencies diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 2fc9754ecfe1e..c8246a52da54c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -968,7 +968,7 @@ class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach { val keysOnly = shuffled.keys val deps = keysOnly.dependencies keysOnly.count() - ALS.cleanShuffleDependencies(sc, deps, true) + ALS.cleanShuffleDependencies(sc, deps) val resultingFiles = getAllFiles assert(resultingFiles === Set()) // Ensure running count again works fine even if we kill the shuffle files. From 4ee7954abf30b4ca538f0483e74de8a4053f0eb9 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 10 Apr 2019 16:41:30 -0700 Subject: [PATCH 08/17] style --- .../shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java | 4 +++- .../sort/lifecycle/DefaultShuffleDriverComponents.java | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java index 8c6d4b5e26971..d1a3f0c644b37 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -38,7 +38,9 @@ public class DefaultShuffleDataCleaner implements ShuffleDataCleaner { */ private final boolean blockOnShuffleCleanup; - public DefaultShuffleDataCleaner(BlockManagerMaster blockManagerMaster, boolean blockOnShuffleCleanup) { + public DefaultShuffleDataCleaner( + BlockManagerMaster blockManagerMaster, + boolean blockOnShuffleCleanup) { this.blockManagerMaster = blockManagerMaster; this.blockOnShuffleCleanup = blockOnShuffleCleanup; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 3362abedf455e..fb879b287c2f5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -30,7 +30,8 @@ public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private BlockManagerMaster blockManagerMaster; public DefaultShuffleDriverComponents(SparkConf sparkConf) { - this.blockOnShuffleCleanup = (boolean) sparkConf.get(package$.MODULE$.CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE()); + this.blockOnShuffleCleanup = + (boolean) sparkConf.get(package$.MODULE$.CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE()); } @Override From b3701728e98f19ea52f56c86b5329c02b9bc5fc6 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 10 Apr 2019 16:47:10 -0700 Subject: [PATCH 09/17] oops --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c21b8624d0fdb..be71a623e6c3d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -564,12 +564,12 @@ class SparkContext(config: SparkConf) extends SafeLogging { val maybeIO = Utils.loadExtensions( classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") - val shuffleDriverComponents = maybeIO.head.driver - shuffleDriverComponents.initializeApplication() + _shuffleDriverComponents = maybeIO.head.driver + _shuffleDriverComponents.initializeApplication() _cleaner = if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this, shuffleDriverComponents.dataCleaner())) + Some(new ContextCleaner(this, _shuffleDriverComponents.dataCleaner())) } else { None } From 111d5ad920f846066450a4d7894872ce2380c46d Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 10 Apr 2019 19:18:47 -0700 Subject: [PATCH 10/17] add blocking flag back --- .../spark/api/shuffle/ShuffleDataCleaner.java | 2 +- .../shuffle/sort/io/DefaultShuffleDataIO.java | 2 +- .../lifecycle/DefaultShuffleDataCleaner.java | 20 +++---------------- .../DefaultShuffleDriverComponents.java | 8 +------- .../org/apache/spark/ContextCleaner.scala | 19 +++++++++++++++--- .../apache/spark/ml/recommendation/ALS.scala | 5 +++-- 6 files changed, 25 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java index bb2b7ebd2540a..36c579e546147 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java @@ -21,6 +21,6 @@ public interface ShuffleDataCleaner { - void removeShuffleData(int shuffleId) throws IOException; + void removeShuffleData(int shuffleId, boolean blocking) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java index 41b2637f690e2..7c124c1fe68bc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java @@ -38,6 +38,6 @@ public ShuffleExecutorComponents executor() { @Override public ShuffleDriverComponents driver() { - return new DefaultShuffleDriverComponents(sparkConf); + return new DefaultShuffleDriverComponents(); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java index d1a3f0c644b37..e7e9bfdd0c8b0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java @@ -26,27 +26,13 @@ public class DefaultShuffleDataCleaner implements ShuffleDataCleaner { private final BlockManagerMaster blockManagerMaster; - /** - * Whether the cleaning thread will block on shuffle cleanup tasks. - * - * When context cleaner is configured to block on every delete request, it can throw timeout - * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this - * parameter by default disables blocking on shuffle cleanups. Note that this does not affect - * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, - * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is - * resolved. - */ - private final boolean blockOnShuffleCleanup; - public DefaultShuffleDataCleaner( - BlockManagerMaster blockManagerMaster, - boolean blockOnShuffleCleanup) { + BlockManagerMaster blockManagerMaster) { this.blockManagerMaster = blockManagerMaster; - this.blockOnShuffleCleanup = blockOnShuffleCleanup; } @Override - public void removeShuffleData(int shuffleId) throws IOException { - blockManagerMaster.removeShuffle(shuffleId, blockOnShuffleCleanup); + public void removeShuffleData(int shuffleId, boolean blocking) throws IOException { + blockManagerMaster.removeShuffle(shuffleId, blocking); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index fb879b287c2f5..48a4da34d85b7 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -26,14 +26,8 @@ public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { - private final boolean blockOnShuffleCleanup; private BlockManagerMaster blockManagerMaster; - public DefaultShuffleDriverComponents(SparkConf sparkConf) { - this.blockOnShuffleCleanup = - (boolean) sparkConf.get(package$.MODULE$.CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE()); - } - @Override public void initializeApplication() { blockManagerMaster = SparkEnv.get().blockManager().master(); @@ -47,7 +41,7 @@ public void cleanupApplication() { @Override public ShuffleDataCleaner dataCleaner() { checkInitialized(); - return new DefaultShuffleDataCleaner(blockManagerMaster, blockOnShuffleCleanup); + return new DefaultShuffleDataCleaner(blockManagerMaster); } private void checkInitialized() { diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 641d15f1163e3..5f869e36d4d1d 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -101,6 +101,19 @@ private[spark] class ContextCleaner( */ private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING) + /** + * Whether the cleaning thread will block on shuffle cleanup tasks. + * + * When context cleaner is configured to block on every delete request, it can throw timeout + * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this + * parameter by default disables blocking on shuffle cleanups. Note that this does not affect + * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, + * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is + * resolved. + */ + private val blockOnShuffleCleanupTasks = + sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE) + @volatile private var stopped = false /** Attach a listener object to get information of when objects are cleaned. */ @@ -178,7 +191,7 @@ private[spark] class ContextCleaner( case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId) + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) case CleanAccum(accId) => @@ -208,11 +221,11 @@ private[spark] class ContextCleaner( } /** Perform shuffle cleanup. */ - def doCleanupShuffle(shuffleId: Int): Unit = { + def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) - shuffleDataCleaner.removeShuffleData(shuffleId) + shuffleDataCleaner.removeShuffleData(shuffleId, blocking) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logInfo("Cleaned shuffle " + shuffleId) } catch { diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 36eb29f33280d..50ef4330ddc80 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1766,7 +1766,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { */ private[spark] def cleanShuffleDependencies[T]( sc: SparkContext, - deps: Seq[Dependency[_]]): Unit = { + deps: Seq[Dependency[_]], + blocking: Boolean = false): Unit = { // If there is no reference tracking we skip clean up. sc.cleaner.foreach { cleaner => /** @@ -1775,7 +1776,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { def cleanEagerly(dep: Dependency[_]): Unit = { if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) { val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId - cleaner.doCleanupShuffle(shuffleId) + cleaner.doCleanupShuffle(shuffleId, blocking) } val rdd = dep.rdd val rddDeps = rdd.dependencies From 484cf3fab2198250b1772263904b750e10d3d001 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 10 Apr 2019 21:07:59 -0700 Subject: [PATCH 11/17] cleanup --- core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala | 2 +- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index a182133b27777..b9b47d4dcd252 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -134,7 +134,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) // Explicit cleanup - shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId)) + shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true)) tester.assertCleanup() // Verify that shuffles can be re-executed after cleaning up diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index c8246a52da54c..2fc9754ecfe1e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -968,7 +968,7 @@ class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach { val keysOnly = shuffled.keys val deps = keysOnly.dependencies keysOnly.count() - ALS.cleanShuffleDependencies(sc, deps) + ALS.cleanShuffleDependencies(sc, deps, true) val resultingFiles = getAllFiles assert(resultingFiles === Set()) // Ensure running count again works fine even if we kill the shuffle files. From 0a94436229b2062e69ac784a46a7dcd5f01b1573 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 11 Apr 2019 11:09:24 -0700 Subject: [PATCH 12/17] remove unused imports --- .../shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 48a4da34d85b7..82696a2dab431 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -17,11 +17,9 @@ package org.apache.spark.shuffle.sort.lifecycle; -import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.api.shuffle.ShuffleDataCleaner; import org.apache.spark.api.shuffle.ShuffleDriverComponents; -import org.apache.spark.internal.config.package$; import org.apache.spark.storage.BlockManagerMaster; public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { From 2dd55196ef7b038d6e640c1b7e112c380b57ed99 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 15 Apr 2019 18:06:35 -0700 Subject: [PATCH 13/17] address comments --- .../spark/api/shuffle/ShuffleDataCleaner.java | 26 ------------- .../spark/api/shuffle/ShuffleDataIO.java | 2 +- .../api/shuffle/ShuffleDriverComponents.java | 2 +- .../lifecycle/DefaultShuffleDataCleaner.java | 38 ------------------- .../DefaultShuffleDriverComponents.java | 7 ++-- .../org/apache/spark/ContextCleaner.scala | 6 +-- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/InternalAccumulatorSuite.scala | 1 - 8 files changed, 10 insertions(+), 74 deletions(-) delete mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java delete mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java deleted file mode 100644 index 36c579e546147..0000000000000 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataCleaner.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.shuffle; - -import java.io.IOException; - -public interface ShuffleDataCleaner { - - void removeShuffleData(int shuffleId, boolean blocking) throws IOException; - -} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java index ecf1c32660823..e7649339f8ba8 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -27,6 +27,6 @@ */ @Experimental public interface ShuffleDataIO { - ShuffleExecutorComponents executor(); ShuffleDriverComponents driver(); + ShuffleExecutorComponents executor(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java index 2625b3e0d001d..3f82c56a1c249 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java @@ -25,5 +25,5 @@ public interface ShuffleDriverComponents { void cleanupApplication() throws IOException; - ShuffleDataCleaner dataCleaner(); + void removeShuffleData(int shuffleId, boolean blocking) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java deleted file mode 100644 index e7e9bfdd0c8b0..0000000000000 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDataCleaner.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.sort.lifecycle; - -import org.apache.spark.api.shuffle.ShuffleDataCleaner; -import org.apache.spark.storage.BlockManagerMaster; - -import java.io.IOException; - -public class DefaultShuffleDataCleaner implements ShuffleDataCleaner { - - private final BlockManagerMaster blockManagerMaster; - - public DefaultShuffleDataCleaner( - BlockManagerMaster blockManagerMaster) { - this.blockManagerMaster = blockManagerMaster; - } - - @Override - public void removeShuffleData(int shuffleId, boolean blocking) throws IOException { - blockManagerMaster.removeShuffle(shuffleId, blocking); - } -} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 82696a2dab431..80a2d850d0cf7 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -18,10 +18,11 @@ package org.apache.spark.shuffle.sort.lifecycle; import org.apache.spark.SparkEnv; -import org.apache.spark.api.shuffle.ShuffleDataCleaner; import org.apache.spark.api.shuffle.ShuffleDriverComponents; import org.apache.spark.storage.BlockManagerMaster; +import java.io.IOException; + public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private BlockManagerMaster blockManagerMaster; @@ -37,9 +38,9 @@ public void cleanupApplication() { } @Override - public ShuffleDataCleaner dataCleaner() { + public void removeShuffleData(int shuffleId, boolean blocking) throws IOException { checkInitialized(); - return new DefaultShuffleDataCleaner(blockManagerMaster); + blockManagerMaster.removeShuffle(shuffleId, blocking); } private void checkInitialized() { diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 5f869e36d4d1d..fa28e54116d25 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.collection.JavaConverters._ -import org.apache.spark.api.shuffle.ShuffleDataCleaner +import org.apache.spark.api.shuffle.ShuffleDriverComponents import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -61,7 +61,7 @@ private class CleanupTaskWeakReference( */ private[spark] class ContextCleaner( sc: SparkContext, - shuffleDataCleaner: ShuffleDataCleaner) extends Logging { + shuffleDriverComponents: ShuffleDriverComponents) extends Logging { /** * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they @@ -225,7 +225,7 @@ private[spark] class ContextCleaner( try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) - shuffleDataCleaner.removeShuffleData(shuffleId, blocking) + shuffleDriverComponents.removeShuffleData(shuffleId, blocking) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logInfo("Cleaned shuffle " + shuffleId) } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index be71a623e6c3d..82d20e17c29de 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -569,7 +569,7 @@ class SparkContext(config: SparkConf) extends SafeLogging { _cleaner = if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this, _shuffleDriverComponents.dataCleaner())) + Some(new ContextCleaner(this, _shuffleDriverComponents)) } else { None } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index bd9e33809726f..82acde3da5f9f 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDataCleaner import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} From 29b9a6c1872e649f096fcc92215e3b72e9b2b29d Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 19 Apr 2019 21:35:29 -0700 Subject: [PATCH 14/17] add sparkconf setting in driver and test --- .../api/shuffle/ShuffleDriverComponents.java | 6 +- .../DefaultShuffleDriverComponents.java | 5 +- .../scala/org/apache/spark/SparkContext.scala | 18 ++--- .../shuffle/ShuffleDriverComponentsTest.scala | 70 +++++++++++++++++++ 4 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java index 3f82c56a1c249..6a0ec8d44fd4f 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java @@ -18,10 +18,14 @@ package org.apache.spark.api.shuffle; import java.io.IOException; +import java.util.Map; public interface ShuffleDriverComponents { - void initializeApplication(); + /** + * @return additional SparkConf values necessary for the executors. + */ + Map initializeApplication(); void cleanupApplication() throws IOException; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index 80a2d850d0cf7..a3eddc8ec930e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -17,19 +17,22 @@ package org.apache.spark.shuffle.sort.lifecycle; +import com.google.common.collect.ImmutableMap; import org.apache.spark.SparkEnv; import org.apache.spark.api.shuffle.ShuffleDriverComponents; import org.apache.spark.storage.BlockManagerMaster; import java.io.IOException; +import java.util.Map; public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private BlockManagerMaster blockManagerMaster; @Override - public void initializeApplication() { + public Map initializeApplication() { blockManagerMaster = SparkEnv.get().blockManager().master(); + return ImmutableMap.of(); } @Override diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 82d20e17c29de..afb9ef2c260d2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -400,6 +400,13 @@ class SparkContext(config: SparkConf) extends SafeLogging { _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER) + val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS) + val maybeIO = Utils.loadExtensions( + classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) + require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") + _shuffleDriverComponents = maybeIO.head.driver + _shuffleDriverComponents.initializeApplication().asScala.foreach(x => _conf.set(x._1, x._2)) + _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten @@ -429,6 +436,8 @@ class SparkContext(config: SparkConf) extends SafeLogging { _statusStore = AppStatusStore.createLiveStore(conf, appStatusSource) listenerBus.addToStatusQueue(_statusStore.listener.get) + _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -460,8 +469,6 @@ class SparkContext(config: SparkConf) extends SafeLogging { // the bound port to the cluster manager properly _ui.foreach(_.bind()) - _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) - // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) @@ -560,13 +567,6 @@ class SparkContext(config: SparkConf) extends SafeLogging { } _executorAllocationManager.foreach(_.start()) - val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS) - val maybeIO = Utils.loadExtensions( - classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) - require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") - _shuffleDriverComponents = maybeIO.head.driver - _shuffleDriverComponents.initializeApplication() - _cleaner = if (_conf.get(CLEANER_REFERENCE_TRACKING)) { Some(new ContextCleaner(this, _shuffleDriverComponents)) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala new file mode 100644 index 0000000000000..9cb46b06e182f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.util + +import com.google.common.collect.ImmutableMap + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents, ShuffleExecutorComponents, ShuffleWriteSupport} +import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS +import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport + +class TestShuffleDriverComponents extends ShuffleDriverComponents { + override def initializeApplication(): util.Map[String, String] = + ImmutableMap.of("spark.test.key", "spark.test.value") + + override def cleanupApplication(): Unit = {} + + override def removeShuffleData(shuffleId: Int, blocking: Boolean): Unit = {} +} + +class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { + override def driver(): ShuffleDriverComponents = new TestShuffleDriverComponents() + + override def executor(): ShuffleExecutorComponents = + new TestShuffleExecutorComponents(sparkConf) +} + +class TestShuffleExecutorComponents(sparkConf: SparkConf) extends ShuffleExecutorComponents { + override def initializeExecutor(appId: String, execId: String): Unit = { + assert(sparkConf.get("spark.test.key").equals("spark.test.value")) + } + + override def writes(): ShuffleWriteSupport = { + val blockManager = SparkEnv.get.blockManager + val blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager) + new DefaultShuffleWriteSupport(sparkConf, blockResolver) + } +} + +class ShuffleDriverComponentsTest extends SparkFunSuite with LocalSparkContext { + test(s"test serialization of shuffle initialization conf to executors") { + val testConf = new SparkConf() + .setAppName("testing") + .setMaster("local-cluster[2,1,1024]") + .set(SHUFFLE_IO_PLUGIN_CLASS, "org.apache.spark.shuffle.TestShuffleDataIO") + + sc = new SparkContext(testConf) + + sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) + .groupByKey() + .collect() + } +} From 89d0a6cbf06c18fe95b261a705cd02402eb952db Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 22 Apr 2019 11:15:53 -0700 Subject: [PATCH 15/17] fix tests? --- .../scala/org/apache/spark/SparkContext.scala | 18 +++++------ ...ala => ShuffleDriverComponentsSuite.scala} | 30 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) rename core/src/test/scala/org/apache/spark/shuffle/{ShuffleDriverComponentsTest.scala => ShuffleDriverComponentsSuite.scala} (97%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index afb9ef2c260d2..d0af9b09e2053 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -400,13 +400,6 @@ class SparkContext(config: SparkConf) extends SafeLogging { _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER) - val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS) - val maybeIO = Utils.loadExtensions( - classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) - require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") - _shuffleDriverComponents = maybeIO.head.driver - _shuffleDriverComponents.initializeApplication().asScala.foreach(x => _conf.set(x._1, x._2)) - _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten @@ -436,8 +429,6 @@ class SparkContext(config: SparkConf) extends SafeLogging { _statusStore = AppStatusStore.createLiveStore(conf, appStatusSource) listenerBus.addToStatusQueue(_statusStore.listener.get) - _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) - // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -469,6 +460,8 @@ class SparkContext(config: SparkConf) extends SafeLogging { // the bound port to the cluster manager properly _ui.foreach(_.bind()) + _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) + // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) @@ -500,6 +493,13 @@ class SparkContext(config: SparkConf) extends SafeLogging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser + val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS) + val maybeIO = Utils.loadExtensions( + classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) + require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") + _shuffleDriverComponents = maybeIO.head.driver + _shuffleDriverComponents.initializeApplication().asScala.foreach(x => _conf.set(x._1, x._2)) + // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala similarity index 97% rename from core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala rename to core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala index 9cb46b06e182f..2d23b38dcdc8a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsTest.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala @@ -26,6 +26,21 @@ import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents, Shu import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport +class ShuffleDriverComponentsSuite extends SparkFunSuite with LocalSparkContext { + test(s"test serialization of shuffle initialization conf to executors") { + val testConf = new SparkConf() + .setAppName("testing") + .setMaster("local-cluster[2,1,1024]") + .set(SHUFFLE_IO_PLUGIN_CLASS, "org.apache.spark.shuffle.TestShuffleDataIO") + + sc = new SparkContext(testConf) + + sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) + .groupByKey() + .collect() + } +} + class TestShuffleDriverComponents extends ShuffleDriverComponents { override def initializeApplication(): util.Map[String, String] = ImmutableMap.of("spark.test.key", "spark.test.value") @@ -53,18 +68,3 @@ class TestShuffleExecutorComponents(sparkConf: SparkConf) extends ShuffleExecuto new DefaultShuffleWriteSupport(sparkConf, blockResolver) } } - -class ShuffleDriverComponentsTest extends SparkFunSuite with LocalSparkContext { - test(s"test serialization of shuffle initialization conf to executors") { - val testConf = new SparkConf() - .setAppName("testing") - .setMaster("local-cluster[2,1,1024]") - .set(SHUFFLE_IO_PLUGIN_CLASS, "org.apache.spark.shuffle.TestShuffleDataIO") - - sc = new SparkContext(testConf) - - sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3) - .groupByKey() - .collect() - } -} From 121c112f29bed92afedb4c32425837aea3d8f02f Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 29 Apr 2019 16:26:20 -0700 Subject: [PATCH 16/17] pass extra configs through interface --- .../org/apache/spark/api/shuffle/ShuffleDataIO.java | 2 ++ .../spark/api/shuffle/ShuffleExecutorComponents.java | 4 +++- .../sort/io/DefaultShuffleExecutorComponents.java | 4 +++- .../src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../apache/spark/shuffle/sort/SortShuffleManager.scala | 10 +++++++++- .../spark/shuffle/ShuffleDriverComponentsSuite.scala | 7 ++++--- 6 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java index e7649339f8ba8..dd7c0ac7320cb 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -27,6 +27,8 @@ */ @Experimental public interface ShuffleDataIO { + String SHUFFLE_SPARK_CONF_PREFIX = "spark.shuffle.plugin."; + ShuffleDriverComponents driver(); ShuffleExecutorComponents executor(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java index 4fc20bad9938b..d6a017bce1878 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -19,6 +19,8 @@ import org.apache.spark.annotation.Experimental; +import java.util.Map; + /** * :: Experimental :: * An interface for building shuffle support for Executors @@ -27,7 +29,7 @@ */ @Experimental public interface ShuffleExecutorComponents { - void initializeExecutor(String appId, String execId); + void initializeExecutor(String appId, String execId, Map extraConfigs); ShuffleWriteSupport writes(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java index 76e87a6740259..bb2db97fa9c95 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java @@ -24,6 +24,8 @@ import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.storage.BlockManager; +import java.util.Map; + public class DefaultShuffleExecutorComponents implements ShuffleExecutorComponents { private final SparkConf sparkConf; @@ -35,7 +37,7 @@ public DefaultShuffleExecutorComponents(SparkConf sparkConf) { } @Override - public void initializeExecutor(String appId, String execId) { + public void initializeExecutor(String appId, String execId, Map extraConfigs) { blockManager = SparkEnv.get().blockManager(); blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d0af9b09e2053..f41d1ce2cac25 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -498,7 +498,8 @@ class SparkContext(config: SparkConf) extends SafeLogging { classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") _shuffleDriverComponents = maybeIO.head.driver - _shuffleDriverComponents.initializeApplication().asScala.foreach(x => _conf.set(x._1, x._2)) + _shuffleDriverComponents.initializeApplication().asScala.foreach(x => + _conf.set(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX + x._1, x._2)) // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 5da7b5cb35e6d..8a6dbb728e71a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -19,6 +19,8 @@ package org.apache.spark.shuffle.sort import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ + import org.apache.spark._ import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleExecutorComponents} import org.apache.spark.internal.{config, Logging} @@ -219,7 +221,13 @@ private[spark] object SortShuffleManager extends Logging { classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") val executorComponents = maybeIO.head.executor() - executorComponents.initializeExecutor(conf.getAppId, SparkEnv.get.executorId) + val extraConfigs = conf.getAllWithPrefix(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX) + .map( e => (e._1.stripPrefix(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX), e._2)) + .toMap + executorComponents.initializeExecutor( + conf.getAppId, + SparkEnv.get.executorId, + extraConfigs.asJava) executorComponents } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala index 2d23b38dcdc8a..dd86f4c48c5f6 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala @@ -43,7 +43,7 @@ class ShuffleDriverComponentsSuite extends SparkFunSuite with LocalSparkContext class TestShuffleDriverComponents extends ShuffleDriverComponents { override def initializeApplication(): util.Map[String, String] = - ImmutableMap.of("spark.test.key", "spark.test.value") + ImmutableMap.of("test-key", "test-value") override def cleanupApplication(): Unit = {} @@ -58,8 +58,9 @@ class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { } class TestShuffleExecutorComponents(sparkConf: SparkConf) extends ShuffleExecutorComponents { - override def initializeExecutor(appId: String, execId: String): Unit = { - assert(sparkConf.get("spark.test.key").equals("spark.test.value")) + override def initializeExecutor(appId: String, execId: String, + extraConfigs: util.Map[String, String]): Unit = { + assert(extraConfigs.get("test-key").equals("test-value")) } override def writes(): ShuffleWriteSupport = { From 6dc0a24e3e6baed5c02bb96879eeb1e1efacf3c1 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 1 May 2019 12:36:00 -0700 Subject: [PATCH 17/17] address comments --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- .../org/apache/spark/shuffle/sort/SortShuffleManager.scala | 1 - .../scala/org/apache/spark/InternalAccumulatorSuite.scala | 2 +- .../apache/spark/shuffle/ShuffleDriverComponentsSuite.scala | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f41d1ce2cac25..999f180193d84 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -497,9 +497,9 @@ class SparkContext(config: SparkConf) extends SafeLogging { val maybeIO = Utils.loadExtensions( classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf) require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") - _shuffleDriverComponents = maybeIO.head.driver - _shuffleDriverComponents.initializeApplication().asScala.foreach(x => - _conf.set(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX + x._1, x._2)) + _shuffleDriverComponents = maybeIO.head.driver() + _shuffleDriverComponents.initializeApplication().asScala.foreach { + case (k, v) => _conf.set(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX + k, v) } // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 8a6dbb728e71a..b5cd0fd558825 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -222,7 +222,6 @@ private[spark] object SortShuffleManager extends Logging { require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses") val executorComponents = maybeIO.head.executor() val extraConfigs = conf.getAllWithPrefix(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX) - .map( e => (e._1.stripPrefix(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX), e._2)) .toMap executorComponents.initializeExecutor( conf.getAppId, diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 82acde3da5f9f..28cbeeda7a88d 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -211,7 +211,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup. */ private class SaveAccumContextCleaner(sc: SparkContext) extends - ContextCleaner(sc, null) { + ContextCleaner(sc, null) { private val accumsRegistered = new ArrayBuffer[Long] override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala index dd86f4c48c5f6..dbb954945a8b6 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala @@ -60,7 +60,7 @@ class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { class TestShuffleExecutorComponents(sparkConf: SparkConf) extends ShuffleExecutorComponents { override def initializeExecutor(appId: String, execId: String, extraConfigs: util.Map[String, String]): Unit = { - assert(extraConfigs.get("test-key").equals("test-value")) + assert(extraConfigs.get("test-key") == "test-value") } override def writes(): ShuffleWriteSupport = {