Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25299] Driver lifecycle api #533

Merged
merged 17 commits into from
May 7, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;

public interface ShuffleDataCleaner {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really need to be separate from ShuffleDriverComponents? just seems to add more levels of indirection to track.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't but I prefer it this way because it does emphasize that the cleaner runs only on the driver

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, can't you just put removeShuffleData() directly in ShuffleDriverComponents, and then get rid of this interface completely? I think it would still be obvious it runs only on the driver.


void removeShuffleData(int shuffleId, boolean blocking) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
@Experimental
public interface ShuffleDataIO {
ShuffleExecutorComponents executor();
ShuffleDriverComponents driver();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move driver above executor. I don't have a great reason for this except that this is how I tend to think about Spark in general though -> driver comes before executors...

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;

public interface ShuffleDriverComponents {

void initializeApplication();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you provide some parameter here? At least SparkConf seems like it would be useful.

(You can probably get it from SparkEnv but I always feel dirty calling that class.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShuffleDataIO is service-loaded by Spark, so its constructor can take a SparkConf as an argument. However, I've never been certain on if we should rely on that vs. passing the SparkConf directly as an argument to initialization methods. The same can be said about executor components initializing executors. Thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that behavior is documented here then it should be fine. Either one works.


void cleanupApplication() throws IOException;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this to throw IOException? Think we can go without any throws declaration for now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm i guess i was expecting this to include something that would make a call somewhere or hit the file system? I can remove it for now


ShuffleDataCleaner dataCleaner();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.shuffle.sort.io;

import org.apache.spark.SparkConf;
import org.apache.spark.api.shuffle.ShuffleDriverComponents;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleDataIO;
import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDriverComponents;

public class DefaultShuffleDataIO implements ShuffleDataIO {

Expand All @@ -33,4 +35,9 @@ public DefaultShuffleDataIO(SparkConf sparkConf) {
public ShuffleExecutorComponents executor() {
return new DefaultShuffleExecutorComponents(sparkConf);
}

@Override
public ShuffleDriverComponents driver() {
return new DefaultShuffleDriverComponents();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.lifecycle;

import org.apache.spark.api.shuffle.ShuffleDataCleaner;
import org.apache.spark.storage.BlockManagerMaster;

import java.io.IOException;

public class DefaultShuffleDataCleaner implements ShuffleDataCleaner {

private final BlockManagerMaster blockManagerMaster;

public DefaultShuffleDataCleaner(
BlockManagerMaster blockManagerMaster) {
this.blockManagerMaster = blockManagerMaster;
}

@Override
public void removeShuffleData(int shuffleId, boolean blocking) throws IOException {
blockManagerMaster.removeShuffle(shuffleId, blocking);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.lifecycle;

import org.apache.spark.SparkEnv;
import org.apache.spark.api.shuffle.ShuffleDataCleaner;
import org.apache.spark.api.shuffle.ShuffleDriverComponents;
import org.apache.spark.storage.BlockManagerMaster;

public class DefaultShuffleDriverComponents implements ShuffleDriverComponents {

private BlockManagerMaster blockManagerMaster;

@Override
public void initializeApplication() {
blockManagerMaster = SparkEnv.get().blockManager().master();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the idea is that for a truly external shuffle service, here the driver would register with that service (including any authentication etc.) and setup heartbeats?

Copy link

@ifilonenko ifilonenko Apr 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be for non-default implementations, like the external implementation I experimented with in my old PR


@Override
public void cleanupApplication() {
// do nothing
}

@Override
public ShuffleDataCleaner dataCleaner() {
checkInitialized();
return new DefaultShuffleDataCleaner(blockManagerMaster);
}

private void checkInitialized() {
if (blockManagerMaster == null) {
throw new IllegalStateException("Driver components must be initialized before using");
}
}
}
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled

import scala.collection.JavaConverters._

import org.apache.spark.api.shuffle.ShuffleDataCleaner
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -58,7 +59,9 @@ private class CleanupTaskWeakReference(
* to be processed when the associated object goes out of scope of the application. Actual
* cleanup is performed in a separate daemon thread.
*/
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private[spark] class ContextCleaner(
sc: SparkContext,
shuffleDataCleaner: ShuffleDataCleaner) extends Logging {

/**
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
Expand Down Expand Up @@ -222,7 +225,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
shuffleDataCleaner.removeShuffleData(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
Expand Down Expand Up @@ -270,7 +273,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
}
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.conda.CondaEnvironment
import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions
import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{CondaRunner, LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
Expand Down Expand Up @@ -215,6 +216,7 @@ class SparkContext(config: SparkConf) extends SafeLogging {
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -558,9 +560,16 @@ class SparkContext(config: SparkConf) extends SafeLogging {
}
_executorAllocationManager.foreach(_.start())

val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
val maybeIO = Utils.loadExtensions(
classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf)
require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses")
_shuffleDriverComponents = maybeIO.head.driver
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driver()

_shuffleDriverComponents.initializeApplication()

_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
Some(new ContextCleaner(this, _shuffleDriverComponents.dataCleaner()))
} else {
None
}
Expand Down Expand Up @@ -1950,6 +1959,7 @@ class SparkContext(config: SparkConf) extends SafeLogging {
}
_heartbeater = null
}
_shuffleDriverComponents.cleanupApplication()
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDataCleaner
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}


Expand Down Expand Up @@ -210,7 +211,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
/**
* A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup.
*/
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private class SaveAccumContextCleaner(sc: SparkContext) extends
ContextCleaner(sc, null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent more

private val accumsRegistered = new ArrayBuffer[Long]

override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
Expand Down