diff --git a/README.md b/README.md index 10338c18..7b78b501 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,8 @@ following configurations in spark-defaults.conf or Spark submit command line arg Note: For DAOS users, DAOS Hadoop/Java API jars should also be included in the classpath as we leverage DAOS Hadoop filesystem. ``` - spark.executor.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/remote-shuffle-.jar - spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/remote-shuffle-.jar + spark.executor.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/shuffle-hadoop-.jar + spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/shuffle-hadoop-.jar ``` Enable the remote shuffle manager and specify the Hadoop storage system URI holding shuffle data. diff --git a/pom.xml b/pom.xml index cb6a8f84..fe2d319f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 1.2.1 OAP Remote Shuffle Parent POM pom @@ -16,7 +16,7 @@ 1.8 ${java.version} ${java.version} - 3.0.0 + 3.1.1 @@ -225,8 +225,8 @@ org.scalatest - scalatest_2.12 - 3.0.8 + scalatest_${scala.binary.version} + 3.2.3 test diff --git a/shuffle-daos/pom.xml b/shuffle-daos/pom.xml index 0626465e..9a8aa87f 100644 --- a/shuffle-daos/pom.xml +++ b/shuffle-daos/pom.xml @@ -7,7 +7,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 1.2.1 shuffle-daos OAP Remote Shuffle Based on DAOS Object API @@ -205,7 +205,7 @@ io.daos daos-java - 1.2.2 + 1.2.1-SNAPSHOT junit @@ -241,4 +241,10 @@ + + + maven-snapshots + http://oss.sonatype.org/content/repositories/snapshots + + diff --git a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/DaosShuffleManager.scala b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/DaosShuffleManager.scala index 30e870bf..cc49f04b 100644 --- a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/DaosShuffleManager.scala +++ b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/DaosShuffleManager.scala @@ -135,20 +135,6 @@ class DaosShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { } override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): DaosShuffleReader[K, C] - = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - new DaosShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, - metrics, daosShuffleIO, SparkEnv.get.serializerManager, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, @@ -157,7 +143,7 @@ class DaosShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { context: TaskContext, metrics: ShuffleReadMetricsReporter): DaosShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new DaosShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, daosShuffleIO, SparkEnv.get.serializerManager, diff --git a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala index 45a804f0..9c379a80 100644 --- a/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala +++ b/shuffle-daos/src/main/scala/org/apache/spark/shuffle/daos/package.scala @@ -170,7 +170,7 @@ package object daos { .intConf .checkValue(v => v > 0, s"async write batch size must be positive") - .createWithDefault(1) + .createWithDefault(30) val SHUFFLE_DAOS_READ_BATCH_SIZE = ConfigBuilder("spark.shuffle.daos.read.batch") diff --git a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosReaderAsyncTest.java b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosReaderAsyncTest.java index 5ccf029e..18b4bb20 100644 --- a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosReaderAsyncTest.java +++ b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosReaderAsyncTest.java @@ -128,19 +128,19 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { public void testTwoEntries() throws Exception { LinkedHashMap, Tuple3> partSizeMap; partSizeMap = new LinkedHashMap<>(); - long mapIds[] = new long[] {12345, 12346}; + long[] mapIds = new long[] {12345, 12346}; int reduceId = 6789; - long lens[] = new long[] {2 * 1024 * 1024, 1023}; + long[] lens = new long[] {2 * 1024 * 1024, 1023}; int shuffleId = 1000; long eqHandle = 1111L; - IOSimpleDDAsync descs[] = new IOSimpleDDAsync[] {Mockito.mock(IOSimpleDDAsync.class), + IOSimpleDDAsync[] descs = new IOSimpleDDAsync[] {Mockito.mock(IOSimpleDDAsync.class), Mockito.mock(IOSimpleDDAsync.class)}; - IOSimpleDDAsync.AsyncEntry entries[] = new IOSimpleDDAsync.AsyncEntry[] { + IOSimpleDDAsync.AsyncEntry[] entries = new IOSimpleDDAsync.AsyncEntry[] { Mockito.mock(IOSimpleDDAsync.AsyncEntry.class), Mockito.mock(IOSimpleDDAsync.AsyncEntry.class) }; - ByteBuf bufs[] = new ByteBuf[] {Mockito.mock(ByteBuf.class), Mockito.mock(ByteBuf.class)}; - boolean readAlready[] = new boolean[] {false, false}; + ByteBuf[] bufs = new ByteBuf[] {Mockito.mock(ByteBuf.class), Mockito.mock(ByteBuf.class)}; + boolean[] readAlready = new boolean[] {false, false}; for (int i = 0; i < 2; i++) { Mockito.when(entries[i].getFetchedData()).thenReturn(bufs[i]); Mockito.when(entries[i].getKey()).thenReturn(String.valueOf(mapIds[i])); @@ -162,7 +162,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } Mockito.when(eq.getEqWrapperHdl()).thenReturn(eqHandle); - int times[] = new int[] {0}; + int[] times = new int[] {0}; Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @@ -176,7 +176,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(eq).pollCompleted(Mockito.any(), Mockito.anyInt(), Mockito.anyLong()); - int times2[] = new int[] {0}; + int[] times2 = new int[] {0}; Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { diff --git a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java index 78664809..00f2cb09 100644 --- a/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java +++ b/shuffle-daos/src/test/java/org/apache/spark/shuffle/daos/DaosShuffleIOTest.java @@ -33,7 +33,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.internal.stubbing.answers.DoesNothing; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; diff --git a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleReaderSuite.scala b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleReaderSuite.scala index 28ad614b..b5d5752b 100644 --- a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleReaderSuite.scala +++ b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleReaderSuite.scala @@ -144,7 +144,7 @@ class DaosShuffleReaderSuite extends SparkFunSuite with LocalSparkContext { val mapOutputTracker = mock(classOf[MapOutputTracker]) val localBlockManagerId = BlockManagerId("test-client", "test-client", 1) when(mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1)).thenReturn { + shuffleId, reduceId)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => @@ -172,7 +172,7 @@ class DaosShuffleReaderSuite extends SparkFunSuite with LocalSparkContext { val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId( - shuffleId, reduceId, reduceId + 1) + shuffleId, reduceId) val (daosReader, shuffleIO, daosObject) = if (singleCall) { diff --git a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterPerf.scala b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterPerf.scala index 2fc2eaed..a5b562b5 100644 --- a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterPerf.scala +++ b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterPerf.scala @@ -26,7 +26,6 @@ package org.apache.spark.shuffle.daos import org.mockito.{Mock, Mockito, MockitoAnnotations} import org.mockito.Answers._ import org.mockito.Mockito.{mock, when} -import org.scalatest.Matchers import scala.collection.mutable import scala.util.Random @@ -36,7 +35,7 @@ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.shuffle.BaseShuffleHandle import org.apache.spark.util.Utils -class DaosShuffleWriterPerf extends SparkFunSuite with SharedSparkContext with Matchers { +class DaosShuffleWriterPerf extends SparkFunSuite with SharedSparkContext { @Mock(answer = RETURNS_SMART_NULLS) private var shuffleIO: DaosShuffleIO = _ diff --git a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterSuite.scala b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterSuite.scala index 6a500aca..dbd9431c 100644 --- a/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterSuite.scala +++ b/shuffle-daos/src/test/scala/org/apache/spark/shuffle/daos/DaosShuffleWriterSuite.scala @@ -27,7 +27,6 @@ import org.mockito.{Mock, Mockito, MockitoAnnotations} import org.mockito.Answers._ import org.mockito.ArgumentMatchers.{any, anyInt} import org.mockito.Mockito.{mock, never, when} -import org.scalatest.Matchers import scala.collection.mutable import org.apache.spark.{Partitioner, SharedSparkContext, ShuffleDependency, SparkFunSuite} @@ -36,7 +35,7 @@ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.shuffle.BaseShuffleHandle import org.apache.spark.util.Utils -class DaosShuffleWriterSuite extends SparkFunSuite with SharedSparkContext with Matchers { +class DaosShuffleWriterSuite extends SparkFunSuite with SharedSparkContext { @Mock(answer = RETURNS_SMART_NULLS) private var shuffleIO: DaosShuffleIO = _ diff --git a/shuffle-hadoop/pom.xml b/shuffle-hadoop/pom.xml index 1217f00a..70176e6d 100644 --- a/shuffle-hadoop/pom.xml +++ b/shuffle-hadoop/pom.xml @@ -5,7 +5,7 @@ com.intel.oap remote-shuffle-parent - 1.2.0 + 1.2.1 shuffle-hadoop diff --git a/shuffle-hadoop/src/main/scala/org/apache/spark/network/netty/RemoteShuffleTransferService.scala b/shuffle-hadoop/src/main/scala/org/apache/spark/network/netty/RemoteShuffleTransferService.scala index c067bbe9..388ea889 100644 --- a/shuffle-hadoop/src/main/scala/org/apache/spark/network/netty/RemoteShuffleTransferService.scala +++ b/shuffle-hadoop/src/main/scala/org/apache/spark/network/netty/RemoteShuffleTransferService.scala @@ -22,9 +22,8 @@ import java.util.{HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.reflect.ClassTag - import com.codahale.metrics.{Metric, MetricSet} - +import org.apache.spark.internal.Logging import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext} import org.apache.spark.network.buffer.ManagedBuffer @@ -46,7 +45,7 @@ private[spark] class RemoteShuffleTransferService( bindAddress: String, override val hostName: String, _port: Int, - numCores: Int) extends BlockTransferService { + numCores: Int) extends BlockTransferService with Logging { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. private val serializer = new JavaSerializer(conf) diff --git a/shuffle-hadoop/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/shuffle-hadoop/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 62d57655..d18e1081 100644 --- a/shuffle-hadoop/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/shuffle-hadoop/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -28,7 +28,6 @@ import scala.collection.mutable import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import scala.concurrent.duration._ import scala.util.control.NonFatal - import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} @@ -37,7 +36,9 @@ import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData} +import org.apache.spark.rdd.{RDD, RDDCheckpointData} +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.shuffle.remote.RemoteShuffleManager import org.apache.spark.storage._ @@ -45,79 +46,79 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ /** - * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of - * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a - * minimal schedule to run the job. It then submits stages as TaskSets to an underlying - * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent - * tasks that can run right away based on the data that's already on the cluster (e.g. map output - * files from previous stages), though it may fail if this data becomes unavailable. - * - * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with - * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks - * in each stage, but operations with shuffle dependencies require multiple stages (one to write a - * set of map output files, and another to read those files after a barrier). In the end, every - * stage will have only shuffle dependencies on other stages, and may compute multiple operations - * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of - * various RDDs - * - * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred - * locations to run each task on, based on the current cache status, and passes these to the - * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being - * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are - * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task - * a small number of times before cancelling the whole stage. - * - * When looking through this code, there are several key concepts: - * - * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler. - * For example, when the user calls an action, like count(), a job will be submitted through - * submitJob. Each Job may require the execution of multiple stages to build intermediate data. - * - * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each - * task computes the same function on partitions of the same RDD. Stages are separated at shuffle - * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to - * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that - * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle. - * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs. - * - * - Tasks are individual units of work, each sent to one machine. - * - * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them - * and likewise remembers which shuffle map stages have already produced output files to avoid - * redoing the map side of a shuffle. - * - * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based - * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. - * - * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, - * to prevent memory leaks in a long-running application. - * - * To recover from failures, the same stage might need to run multiple times, which are called - * "attempts". If the TaskScheduler reports that a task failed because a map output file from a - * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a - * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small - * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost - * stage(s) that compute the missing tasks. As part of this process, we might also have to create - * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since - * tasks from the old attempt of a stage could still be running, care must be taken to map any - * events received in the correct Stage object. - * - * Here's a checklist to use when making or reviewing changes to this class: - * - * - All data structures should be cleared when the jobs involving them end to avoid indefinite - * accumulation of state in long-running programs. - * - * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to - * include the new structure. This will help to catch memory leaks. - */ + * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of + * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a + * minimal schedule to run the job. It then submits stages as TaskSets to an underlying + * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent + * tasks that can run right away based on the data that's already on the cluster (e.g. map output + * files from previous stages), though it may fail if this data becomes unavailable. + * + * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with + * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks + * in each stage, but operations with shuffle dependencies require multiple stages (one to write a + * set of map output files, and another to read those files after a barrier). In the end, every + * stage will have only shuffle dependencies on other stages, and may compute multiple operations + * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of + * various RDDs + * + * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred + * locations to run each task on, based on the current cache status, and passes these to the + * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being + * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are + * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task + * a small number of times before cancelling the whole stage. + * + * When looking through this code, there are several key concepts: + * + * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler. + * For example, when the user calls an action, like count(), a job will be submitted through + * submitJob. Each Job may require the execution of multiple stages to build intermediate data. + * + * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each + * task computes the same function on partitions of the same RDD. Stages are separated at shuffle + * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to + * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that + * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle. + * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs. + * + * - Tasks are individual units of work, each sent to one machine. + * + * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them + * and likewise remembers which shuffle map stages have already produced output files to avoid + * redoing the map side of a shuffle. + * + * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based + * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. + * + * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, + * to prevent memory leaks in a long-running application. + * + * To recover from failures, the same stage might need to run multiple times, which are called + * "attempts". If the TaskScheduler reports that a task failed because a map output file from a + * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a + * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small + * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost + * stage(s) that compute the missing tasks. As part of this process, we might also have to create + * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since + * tasks from the old attempt of a stage could still be running, care must be taken to map any + * events received in the correct Stage object. + * + * Here's a checklist to use when making or reviewing changes to this class: + * + * - All data structures should be cleared when the jobs involving them end to avoid indefinite + * accumulation of state in long-running programs. + * + * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to + * include the new structure. This will help to catch memory leaks. + */ private[spark] class DAGScheduler( - private[scheduler] val sc: SparkContext, - private[scheduler] val taskScheduler: TaskScheduler, - listenerBus: LiveListenerBus, - mapOutputTracker: MapOutputTrackerMaster, - blockManagerMaster: BlockManagerMaster, - env: SparkEnv, - clock: Clock = new SystemClock()) + private[scheduler] val sc: SparkContext, + private[scheduler] val taskScheduler: TaskScheduler, + listenerBus: LiveListenerBus, + mapOutputTracker: MapOutputTrackerMaster, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv, + clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext, taskScheduler: TaskScheduler) = { @@ -141,11 +142,11 @@ private[spark] class DAGScheduler( private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] /** - * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for - * that dependency. Only includes stages that are part of currently running job (when the job(s) - * that require the shuffle stage complete, the mapping will be removed, and the only record of - * the shuffle data will be in the MapOutputTracker). - */ + * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for + * that dependency. Only includes stages that are part of currently running job (when the job(s) + * that require the shuffle stage complete, the mapping will be removed, and the only record of + * the shuffle data will be in the MapOutputTracker). + */ private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] @@ -161,21 +162,42 @@ private[spark] class DAGScheduler( private[scheduler] val activeJobs = new HashSet[ActiveJob] /** - * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids - * and its values are arrays indexed by partition numbers. Each array value is the set of - * locations where that RDD partition is cached. - * - * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). - */ + * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids + * and its values are arrays indexed by partition numbers. Each array value is the set of + * locations where that RDD partition is cached. + * + * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). + */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] - // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with - // every task. When we detect a node failing, we note the current epoch number and failed - // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. - // - // TODO: Garbage collect information about failure epochs when we know there are no more - // stray messages to detect. - private val failedEpoch = new HashMap[String, Long] + /** + * Tracks the latest epoch of a fully processed error related to the given executor. (We use + * the MapOutputTracker's epoch number, which is sent with every task.) + * + * When an executor fails, it can affect the results of many tasks, and we have to deal with + * all of them consistently. We don't simply ignore all future results from that executor, + * as the failures may have been transient; but we also don't want to "overreact" to follow- + * on errors we receive. Furthermore, we might receive notification of a task success, after + * we find out the executor has actually failed; we'll assume those successes are, in fact, + * simply delayed notifications and the results have been lost, if the tasks started in the + * same or an earlier epoch. In particular, we use this to control when we tell the + * BlockManagerMaster that the BlockManager has been lost. + */ + private val executorFailureEpoch = new HashMap[String, Long] + + /** + * Tracks the latest epoch of a fully processed error where shuffle files have been lost from + * the given executor. + * + * This is closely related to executorFailureEpoch. They only differ for the executor when + * there is an external shuffle service serving shuffle files and we haven't been notified that + * the entire worker has been lost. In that case, when an executor is lost, we do not update + * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor + * fails, we do not unregister the shuffle data as it can still be served; but if there is + * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle + * data only once, even if we get many fetch failures. + */ + private val shuffleFileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -186,36 +208,38 @@ private[spark] class DAGScheduler( /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY) + private val shouldMergeResourceProfiles = sc.getConf.get(config.RESOURCE_PROFILE_MERGE_CONFLICTS) + /** - * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, - * this is set default to false, which means, we only unregister the outputs related to the exact - * executor(instead of the host) on a FetchFailure. - */ + * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, + * this is set default to false, which means, we only unregister the outputs related to the exact + * executor(instead of the host) on a FetchFailure. + */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE) /** - * Number of consecutive stage attempts allowed before a stage is aborted. - */ + * Number of consecutive stage attempts allowed before a stage is aborted. + */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) /** - * Number of max concurrent tasks check failures for each barrier job. - */ + * Number of max concurrent tasks check failures for each barrier job. + */ private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] /** - * Time in seconds to wait between a max concurrent tasks check failure and the next check. - */ + * Time in seconds to wait between a max concurrent tasks check failure and the next check. + */ private val timeIntervalNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL) /** - * Max number of max concurrent tasks check failures allowed for a job before fail the job - * submission. - */ + * Max number of max concurrent tasks check failures allowed for a job before fail the job + * submission. + */ private val maxFailureNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) @@ -225,47 +249,49 @@ private[spark] class DAGScheduler( private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf) + /** - * Called by the TaskSetManager to report task's starting. - */ + * Called by the TaskSetManager to report task's starting. + */ def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = { eventProcessLoop.post(BeginEvent(task, taskInfo)) } /** - * Called by the TaskSetManager to report that a task has completed - * and results are being fetched remotely. - */ + * Called by the TaskSetManager to report that a task has completed + * and results are being fetched remotely. + */ def taskGettingResult(taskInfo: TaskInfo): Unit = { eventProcessLoop.post(GettingResultEvent(taskInfo)) } /** - * Called by the TaskSetManager to report task completions or failures. - */ + * Called by the TaskSetManager to report task completions or failures. + */ def taskEnded( - task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: Seq[AccumulatorV2[_, _]], - metricPeaks: Array[Long], - taskInfo: TaskInfo): Unit = { + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + metricPeaks: Array[Long], + taskInfo: TaskInfo): Unit = { eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, metricPeaks, taskInfo)) } /** - * Update metrics for in-progress tasks and let the master know that the BlockManager is still - * alive. Return true if the driver knows about the given block manager. Otherwise, return false, - * indicating that the block manager should re-register. - */ + * Update metrics for in-progress tasks and let the master know that the BlockManager is still + * alive. Return true if the driver knows about the given block manager. Otherwise, return false, + * indicating that the block manager should re-register. + */ def executorHeartbeatReceived( - execId: String, - // (taskId, stageId, stageAttemptId, accumUpdates) - accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], - blockManagerId: BlockManagerId, - // (stageId, stageAttemptId) -> metrics - executorUpdates: mutable.Map[(Int, Int), ExecutorMetrics]): Boolean = { + execId: String, + // (taskId, stageId, stageAttemptId, accumUpdates) + accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], + blockManagerId: BlockManagerId, + // (stageId, stageAttemptId) -> metrics + executorUpdates: mutable.Map[(Int, Int), ExecutorMetrics]): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates, executorUpdates)) blockManagerMaster.driverHeartbeatEndPoint.askSync[Boolean]( @@ -273,41 +299,61 @@ private[spark] class DAGScheduler( } /** - * Called by TaskScheduler implementation when an executor fails. - */ + * Called by TaskScheduler implementation when an executor fails. + */ def executorLost(execId: String, reason: ExecutorLossReason): Unit = { eventProcessLoop.post(ExecutorLost(execId, reason)) } /** - * Called by TaskScheduler implementation when a worker is removed. - */ + * Called by TaskScheduler implementation when a worker is removed. + */ def workerRemoved(workerId: String, host: String, message: String): Unit = { eventProcessLoop.post(WorkerRemoved(workerId, host, message)) } /** - * Called by TaskScheduler implementation when a host is added. - */ + * Called by TaskScheduler implementation when a host is added. + */ def executorAdded(execId: String, host: String): Unit = { eventProcessLoop.post(ExecutorAdded(execId, host)) } /** - * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or - * cancellation of the job itself. - */ + * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or + * cancellation of the job itself. + */ def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = { eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) } /** - * Called by the TaskSetManager when it decides a speculative task is needed. - */ + * Called by the TaskSetManager when it decides a speculative task is needed. + */ def speculativeTaskSubmitted(task: Task[_]): Unit = { eventProcessLoop.post(SpeculativeTaskSubmitted(task)) } + /** + * Called by the TaskSetManager when a taskset becomes unschedulable due to executors being + * excluded because of too many task failures and dynamic allocation is enabled. + */ + def unschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + /** + * Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic + * allocation is enabled. + */ + def unschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -332,13 +378,13 @@ private[spark] class DAGScheduler( } /** - * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the - * shuffle map stage doesn't already exist, this method will create the shuffle map stage in - * addition to any missing ancestor shuffle map stages. - */ + * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the + * shuffle map stage doesn't already exist, this method will create the shuffle map stage in + * addition to any missing ancestor shuffle map stages. + */ private def getOrCreateShuffleMapStage( - shuffleDep: ShuffleDependency[_, _, _], - firstJobId: Int): ShuffleMapStage = { + shuffleDep: ShuffleDependency[_, _, _], + firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage @@ -361,38 +407,41 @@ private[spark] class DAGScheduler( } /** - * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The - * following patterns are not supported: - * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. - * union()/coalesce()/first()/take()/PartitionPruningRDD); - * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). - */ + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (e.g. + * union()/coalesce()/first()/take()/PartitionPruningRDD); + * 2. An RDD that depends on multiple barrier RDDs (e.g. barrierRdd1.zip(barrierRdd2)). + */ private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numTasksInStage: Int): Unit = { if (rdd.isBarrier() && - !traverseParentRDDsWithinStage(rdd, (r: RDD[_]) => - r.getNumPartitions == numTasksInStage && + !traverseParentRDDsWithinStage(rdd, (r: RDD[_]) => + r.getNumPartitions == numTasksInStage && r.dependencies.count(_.rdd.isBarrier()) <= 1)) { throw new BarrierJobUnsupportedRDDChainException } } /** - * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a - * previously run stage generated the same shuffle data, this function will copy the output - * locations that are still available from the previous shuffle to avoid unnecessarily - * regenerating data. - */ + * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a + * previously run stage generated the same shuffle data, this function will copy the output + * locations that are still available from the previous shuffle to avoid unnecessarily + * regenerating data. + */ def createShuffleMapStage[K, V, C]( - shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { + shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd + val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd) + val resourceProfile = mergeResourceProfilesForStage(resourceProfiles) checkBarrierStageWithDynamicAllocation(rdd) - checkBarrierStageWithNumSlots(rdd) + checkBarrierStageWithNumSlots(rdd, resourceProfile) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length - val parents = getOrCreateParentStages(rdd, jobId) + val parents = getOrCreateParentStages(shuffleDeps, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage( - id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) + id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker, + resourceProfile.id) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage @@ -409,16 +458,16 @@ private[spark] class DAGScheduler( } /** - * We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead - * to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that - * we acquire some executors (but not enough to launch all the tasks in a barrier stage) and - * later release them due to executor idle time expire, and then acquire again). - * - * We perform the check on job submit and fail fast if running a barrier stage with dynamic - * resource allocation enabled. - * - * TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage - */ + * We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead + * to some confusing behaviors (e.g. with dynamic resource allocation enabled, it may happen that + * we acquire some executors (but not enough to launch all the tasks in a barrier stage) and + * later release them due to executor idle time expire, and then acquire again). + * + * We perform the check on job submit and fail fast if running a barrier stage with dynamic + * resource allocation enabled. + * + * TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage + */ private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { throw new BarrierJobRunWithDynamicAllocationException @@ -426,53 +475,115 @@ private[spark] class DAGScheduler( } /** - * Check whether the barrier stage requires more slots (to be able to launch all tasks in the - * barrier stage together) than the total number of active slots currently. Fail current check - * if trying to submit a barrier stage that requires more slots than current total number. If - * the check fails consecutively beyond a configured number for a job, then fail current job - * submission. - */ - private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { - val numPartitions = rdd.getNumPartitions - val maxNumConcurrentTasks = sc.maxNumConcurrentTasks - if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { - throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail current check + * if trying to submit a barrier stage that requires more slots than current total number. If + * the check fails consecutively beyond a configured number for a job, then fail current job + * submission. + */ + private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = { + if (rdd.isBarrier()) { + val numPartitions = rdd.getNumPartitions + val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) + if (numPartitions > maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + } } } + private[scheduler] def mergeResourceProfilesForStage( + stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = { + logDebug(s"Merging stage rdd profiles: $stageResourceProfiles") + val resourceProfile = if (stageResourceProfiles.size > 1) { + if (shouldMergeResourceProfiles) { + val startResourceProfile = stageResourceProfiles.head + val mergedProfile = stageResourceProfiles.drop(1) + .foldLeft(startResourceProfile)((a, b) => mergeResourceProfiles(a, b)) + // compared merged profile with existing ones so we don't add it over and over again + // if the user runs the same operation multiple times + val resProfile = sc.resourceProfileManager.getEquivalentProfile(mergedProfile) + resProfile match { + case Some(existingRp) => existingRp + case None => + // this ResourceProfile could be different if it was merged so we have to add it to + // our ResourceProfileManager + sc.resourceProfileManager.addResourceProfile(mergedProfile) + mergedProfile + } + } else { + throw new IllegalArgumentException("Multiple ResourceProfiles specified in the RDDs for " + + "this stage, either resolve the conflicting ResourceProfiles yourself or enable " + + s"${config.RESOURCE_PROFILE_MERGE_CONFLICTS.key} and understand how Spark handles " + + "the merging them.") + } + } else { + if (stageResourceProfiles.size == 1) { + stageResourceProfiles.head + } else { + sc.resourceProfileManager.defaultResourceProfile + } + } + resourceProfile + } + + // This is a basic function to merge resource profiles that takes the max + // value of the profiles. We may want to make this more complex in the future as + // you may want to sum some resources (like memory). + private[scheduler] def mergeResourceProfiles( + r1: ResourceProfile, + r2: ResourceProfile): ResourceProfile = { + val mergedExecKeys = r1.executorResources ++ r2.executorResources + val mergedExecReq = mergedExecKeys.map { case (k, v) => + val larger = r1.executorResources.get(k).map( x => + if (x.amount > v.amount) x else v).getOrElse(v) + k -> larger + } + val mergedTaskKeys = r1.taskResources ++ r2.taskResources + val mergedTaskReq = mergedTaskKeys.map { case (k, v) => + val larger = r1.taskResources.get(k).map( x => + if (x.amount > v.amount) x else v).getOrElse(v) + k -> larger + } + new ResourceProfile(mergedExecReq, mergedTaskReq) + } + /** - * Create a ResultStage associated with the provided jobId. - */ + * Create a ResultStage associated with the provided jobId. + */ private def createResultStage( - rdd: RDD[_], - func: (TaskContext, Iterator[_]) => _, - partitions: Array[Int], - jobId: Int, - callSite: CallSite): ResultStage = { + rdd: RDD[_], + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], + jobId: Int, + callSite: CallSite): ResultStage = { + val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd) + val resourceProfile = mergeResourceProfilesForStage(resourceProfiles) checkBarrierStageWithDynamicAllocation(rdd) - checkBarrierStageWithNumSlots(rdd) + checkBarrierStageWithNumSlots(rdd, resourceProfile) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) - val parents = getOrCreateParentStages(rdd, jobId) + val parents = getOrCreateParentStages(shuffleDeps, jobId) val id = nextStageId.getAndIncrement() - val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) + val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, + callSite, resourceProfile.id) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } /** - * Get or create the list of parent stages for a given RDD. The new Stages will be created with - * the provided firstJobId. - */ - private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { - getShuffleDependencies(rdd).map { shuffleDep => + * Get or create the list of parent stages for the given shuffle dependencies. The new + * Stages will be created with the provided firstJobId. + */ + private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]], + firstJobId: Int): List[Stage] = { + shuffleDeps.map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( - rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = { + rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = { val ancestors = new ListBuffer[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError @@ -483,7 +594,8 @@ private[spark] class DAGScheduler( val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit - getShuffleDependencies(toVisit).foreach { shuffleDep => + val (shuffleDeps, _) = getShuffleDependenciesAndResourceProfiles(toVisit) + shuffleDeps.foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.prepend(shuffleDep) waitingForVisit.prepend(shuffleDep.rdd) @@ -495,20 +607,22 @@ private[spark] class DAGScheduler( } /** - * Returns shuffle dependencies that are immediate parents of the given RDD. - * - * This function will not return more distant ancestors. For example, if C has a shuffle - * dependency on B which has a shuffle dependency on A: - * - * A <-- B <-- C - * - * calling this function with rdd C will only return the B <-- C dependency. - * - * This function is scheduler-visible for the purpose of unit testing. - */ - private[scheduler] def getShuffleDependencies( - rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { + * Returns shuffle dependencies that are immediate parents of the given RDD and the + * ResourceProfiles associated with the RDDs for this stage. + * + * This function will not return more distant ancestors for shuffle dependencies. For example, + * if C has a shuffle dependency on B which has a shuffle dependency on A: + * + * A <-- B <-- C + * + * calling this function with rdd C will only return the B <-- C dependency. + * + * This function is scheduler-visible for the purpose of unit testing. + */ + private[scheduler] def getShuffleDependenciesAndResourceProfiles( + rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = { val parents = new HashSet[ShuffleDependency[_, _, _]] + val resourceProfiles = new HashSet[ResourceProfile] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] waitingForVisit += rdd @@ -516,6 +630,7 @@ private[spark] class DAGScheduler( val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit + Option(toVisit.getResourceProfile).foreach(resourceProfiles += _) toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep @@ -524,13 +639,13 @@ private[spark] class DAGScheduler( } } } - parents + (parents, resourceProfiles) } /** - * Traverses the given RDD and its ancestors within the same stage and checks whether all of the - * RDDs satisfy a given predicate. - */ + * Traverses the given RDD and its ancestors within the same stage and checks whether all of the + * RDDs satisfy a given predicate. + */ private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = { val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] @@ -544,7 +659,7 @@ private[spark] class DAGScheduler( visited += toVisit toVisit.dependencies.foreach { case _: ShuffleDependency[_, _, _] => - // Not within the same stage with current rdd, do nothing. + // Not within the same stage with current rdd, do nothing. case dependency => waitingForVisit.prepend(dependency.rdd) } @@ -586,9 +701,9 @@ private[spark] class DAGScheduler( } /** - * Registers the given jobId among the jobs that need the given stage and - * all of that stage's ancestors. - */ + * Registers the given jobId among the jobs that need the given stage and + * all of that stage's ancestors. + */ private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = { @tailrec def updateJobIdStageIdMapsList(stages: List[Stage]): Unit = { @@ -604,11 +719,11 @@ private[spark] class DAGScheduler( } /** - * Removes state for job and any stages that are not needed by any other job. Does not - * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks. - * - * @param job The job whose state to cleanup. - */ + * Removes state for job and any stages that are not needed by any other job. Does not + * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks. + * + * @param job The job whose state to cleanup. + */ private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = { val registeredStages = jobIdToStageIds.get(job.jobId) if (registeredStages.isEmpty || registeredStages.get.isEmpty) { @@ -620,7 +735,7 @@ private[spark] class DAGScheduler( if (!jobSet.contains(job.jobId)) { logError( "Job %d not registered for stage %d even though that stage was registered for the job" - .format(job.jobId, stageId)) + .format(job.jobId, stageId)) } else { def removeStage(stageId: Int): Unit = { // data structures based on Stage @@ -664,28 +779,28 @@ private[spark] class DAGScheduler( } /** - * Submit an action job to the scheduler. - * - * @param rdd target RDD to run tasks on - * @param func a function to run on each partition of the RDD - * @param partitions set of partitions to run on; some jobs may not want to compute on all - * partitions of the target RDD, e.g. for operations like first() - * @param callSite where in the user program this job was called - * @param resultHandler callback to pass each result to - * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name - * - * @return a JobWaiter object that can be used to block until the job finishes executing - * or can be used to cancel the job. - * - * @throws IllegalArgumentException when partitions ids are illegal - */ + * Submit an action job to the scheduler. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like first() + * @param callSite where in the user program this job was called + * @param resultHandler callback to pass each result to + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + * + * @return a JobWaiter object that can be used to block until the job finishes executing + * or can be used to cancel the job. + * + * @throws IllegalArgumentException when partitions ids are illegal + */ def submitJob[T, U]( - rdd: RDD[T], - func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], - callSite: CallSite, - resultHandler: (Int, U) => Unit, - properties: Properties): JobWaiter[U] = { + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + callSite: CallSite, + resultHandler: (Int, U) => Unit, + properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => @@ -719,36 +834,36 @@ private[spark] class DAGScheduler( } /** - * Run an action job on the given RDD and pass all the results to the resultHandler function as - * they arrive. - * - * @param rdd target RDD to run tasks on - * @param func a function to run on each partition of the RDD - * @param partitions set of partitions to run on; some jobs may not want to compute on all - * partitions of the target RDD, e.g. for operations like first() - * @param callSite where in the user program this job was called - * @param resultHandler callback to pass each result to - * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name - * - * @note Throws `Exception` when the job fails - */ + * Run an action job on the given RDD and pass all the results to the resultHandler function as + * they arrive. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like first() + * @param callSite where in the user program this job was called + * @param resultHandler callback to pass each result to + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + * + * @note Throws `Exception` when the job fails + */ def runJob[T, U]( - rdd: RDD[T], - func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], - callSite: CallSite, - resultHandler: (Int, U) => Unit, - properties: Properties): Unit = { + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + callSite: CallSite, + resultHandler: (Int, U) => Unit, + properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) @@ -757,28 +872,29 @@ private[spark] class DAGScheduler( } /** - * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator - * as they arrive. Returns a partial result object from the evaluator. - * - * @param rdd target RDD to run tasks on - * @param func a function to run on each partition of the RDD - * @param evaluator `ApproximateEvaluator` to receive the partial results - * @param callSite where in the user program this job was called - * @param timeout maximum time to wait for the job, in milliseconds - * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name - */ + * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator + * as they arrive. Returns a partial result object from the evaluator. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param evaluator `ApproximateEvaluator` to receive the partial results + * @param callSite where in the user program this job was called + * @param timeout maximum time to wait for the job, in milliseconds + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + */ def runApproximateJob[T, U, R]( - rdd: RDD[T], - func: (TaskContext, Iterator[T]) => U, - evaluator: ApproximateEvaluator[U, R], - callSite: CallSite, - timeout: Long, - properties: Properties): PartialResult[R] = { + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + evaluator: ApproximateEvaluator[U, R], + callSite: CallSite, + timeout: Long, + properties: Properties): PartialResult[R] = { val jobId = nextJobId.getAndIncrement() + val clonedProperties = Utils.cloneProperties(properties) if (rdd.partitions.isEmpty) { // Return immediately if the job is running 0 tasks val time = clock.getTimeMillis() - listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties)) + listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), clonedProperties)) listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) return new PartialResult(evaluator.currentResult(), true) } @@ -786,27 +902,27 @@ private[spark] class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener, - Utils.cloneProperties(properties))) + clonedProperties)) listener.awaitResult() // Will throw an exception if the job fails } /** - * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter - * can be used to block until the job finishes executing or can be used to cancel the job. - * This method is used for adaptive query planning, to run map stages and look at statistics - * about their outputs before submitting downstream stages. - * - * @param dependency the ShuffleDependency to run a map stage for - * @param callback function called with the result of the job, which in this case will be a - * single MapOutputStatistics object showing how much data was produced for each partition - * @param callSite where in the user program this job was submitted - * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name - */ + * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter + * can be used to block until the job finishes executing or can be used to cancel the job. + * This method is used for adaptive query planning, to run map stages and look at statistics + * about their outputs before submitting downstream stages. + * + * @param dependency the ShuffleDependency to run a map stage for + * @param callback function called with the result of the job, which in this case will be a + * single MapOutputStatistics object showing how much data was produced for each partition + * @param callSite where in the user program this job was submitted + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + */ def submitMapStage[K, V, C]( - dependency: ShuffleDependency[K, V, C], - callback: MapOutputStatistics => Unit, - callSite: CallSite, - properties: Properties): JobWaiter[MapOutputStatistics] = { + dependency: ShuffleDependency[K, V, C], + callback: MapOutputStatistics => Unit, + callSite: CallSite, + properties: Properties): JobWaiter[MapOutputStatistics] = { val rdd = dependency.rdd val jobId = nextJobId.getAndIncrement() @@ -828,24 +944,24 @@ private[spark] class DAGScheduler( } /** - * Cancel a job that is running or waiting in the queue. - */ + * Cancel a job that is running or waiting in the queue. + */ def cancelJob(jobId: Int, reason: Option[String]): Unit = { logInfo("Asked to cancel job " + jobId) eventProcessLoop.post(JobCancelled(jobId, reason)) } /** - * Cancel all jobs in the given job group ID. - */ + * Cancel all jobs in the given job group ID. + */ def cancelJobGroup(groupId: String): Unit = { logInfo("Asked to cancel job group " + groupId) eventProcessLoop.post(JobGroupCancelled(groupId)) } /** - * Cancel all jobs that are running or waiting in the queue. - */ + * Cancel all jobs that are running or waiting in the queue. + */ def cancelAllJobs(): Unit = { eventProcessLoop.post(AllJobsCancelled) } @@ -859,25 +975,25 @@ private[spark] class DAGScheduler( } /** - * Cancel all jobs associated with a running or scheduled stage. - */ + * Cancel all jobs associated with a running or scheduled stage. + */ def cancelStage(stageId: Int, reason: Option[String]): Unit = { eventProcessLoop.post(StageCancelled(stageId, reason)) } /** - * Kill a given task. It will be retried. - * - * @return Whether the task was successfully killed. - */ + * Kill a given task. It will be retried. + * + * @return Whether the task was successfully killed. + */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { taskScheduler.killTaskAttempt(taskId, interruptThread, reason) } /** - * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since - * the last fetch failure. - */ + * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since + * the last fetch failure. + */ private[scheduler] def resubmitFailedStages(): Unit = { if (failedStages.nonEmpty) { // Failed stages may be removed by job cancellation, so failed might be empty even if @@ -893,10 +1009,10 @@ private[spark] class DAGScheduler( } /** - * Check for waiting stages which are now eligible for resubmission. - * Submits stages that depend on the given parent stage. Called when the parent stage completes - * successfully. - */ + * Check for waiting stages which are now eligible for resubmission. + * Submits stages that depend on the given parent stage. Called when the parent stage completes + * successfully. + */ private def submitWaitingChildStages(parent: Stage): Unit = { logTrace(s"Checking if any dependencies of $parent are now runnable") logTrace("running: " + runningStages) @@ -929,14 +1045,14 @@ private[spark] class DAGScheduler( } val jobIds = activeInGroup.map(_.jobId) jobIds.foreach(handleJobCancellation(_, - Option("part of cancelled job group %s".format(groupId)))) + Option("part of cancelled job group %s".format(groupId)))) } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = { // Note that there is a chance that this task is launched after the stage is cancelled. // In that case, we wouldn't have the stage anymore in stageIdToStage. val stageAttemptId = - stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) + stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } @@ -944,10 +1060,22 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) } + private[scheduler] def handleUnschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { + listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + private[scheduler] def handleUnschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) + } + private[scheduler] def handleTaskSetFailed( - taskSet: TaskSet, - reason: String, - exception: Option[Throwable]): Unit = { + taskSet: TaskSet, + reason: String, + exception: Option[Throwable]): Unit = { stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } } @@ -974,12 +1102,12 @@ private[spark] class DAGScheduler( } private[scheduler] def handleJobSubmitted(jobId: Int, - finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, - partitions: Array[Int], - callSite: CallSite, - listener: JobListener, - properties: Properties): Unit = { + finalRDD: RDD[_], + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], + callSite: CallSite, + listener: JobListener, + properties: Properties): Unit = { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a @@ -1035,15 +1163,16 @@ private[spark] class DAGScheduler( val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( - SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, + Utils.cloneProperties(properties))) submitStage(finalStage) } private[scheduler] def handleMapStageSubmitted(jobId: Int, - dependency: ShuffleDependency[_, _, _], - callSite: CallSite, - listener: JobListener, - properties: Properties): Unit = { + dependency: ShuffleDependency[_, _, _], + callSite: CallSite, + listener: JobListener, + properties: Properties): Unit = { // Submitting this map stage might still require the creation of some parent stages, so make // sure that happens. var finalStage: ShuffleMapStage = null @@ -1073,7 +1202,8 @@ private[spark] class DAGScheduler( val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( - SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, + Utils.cloneProperties(properties))) submitStage(finalStage) // If the whole stage has already finished, tell the listener and remove it @@ -1106,6 +1236,54 @@ private[spark] class DAGScheduler( } } + /** + * `PythonRunner` needs to know what the pyspark memory and cores settings are for the profile + * being run. Pass them in the local properties of the task if it's set for the stage profile. + */ + private def addPySparkConfigsToProperties(stage: Stage, properties: Properties): Unit = { + val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) + val pysparkMem = rp.getPySparkMemory + // use the getOption on EXECUTOR_CORES.key instead of using the EXECUTOR_CORES config reader + // because the default for this config isn't correct for standalone mode. Here we want + // to know if it was explicitly set or not. The default profile always has it set to either + // what user specified or default so special case it here. + val execCores = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) { + sc.conf.getOption(config.EXECUTOR_CORES.key) + } else { + val profCores = rp.getExecutorCores.map(_.toString) + if (profCores.isEmpty) sc.conf.getOption(config.EXECUTOR_CORES.key) else profCores + } + pysparkMem.map(mem => properties.setProperty(PYSPARK_MEMORY_LOCAL_PROPERTY, mem.toString)) + execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) + } + + /** + * If push based shuffle is enabled, set the shuffle services to be used for the given + * shuffle map stage for block push/merge. + * + * Even with dynamic resource allocation kicking in and significantly reducing the number + * of available active executors, we would still be able to get sufficient shuffle service + * locations for block push/merge by getting the historical locations of past executors. + */ + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { + // TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize + // TODO changes we cannot disable shuffle merge for the retry/reuse cases + val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + + if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" + + s" ${stage.shuffleDep.getMergerLocs.size} merger locations") + + logDebug("List of shuffle push merger locations " + + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + } else { + logInfo("No available merger locations." + + s" Push-based shuffle disabled for $stage (${stage.name})") + } + } + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") @@ -1125,6 +1303,7 @@ private[spark] class DAGScheduler( // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties + addPySparkConfigsToProperties(stage, properties) runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are @@ -1134,6 +1313,12 @@ private[spark] class DAGScheduler( stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + // Only generate merger location for a given shuffle dependency once. This way, even if + // this stage gets retried, it would still be merging blocks using the same set of + // shuffle services. + if (pushBasedShuffleEnabled) { + prepareShuffleServicesForShuffleMapStage(s) + } case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) @@ -1151,7 +1336,8 @@ private[spark] class DAGScheduler( } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) - listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) + listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, + Utils.cloneProperties(properties))) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return @@ -1165,7 +1351,8 @@ private[spark] class DAGScheduler( if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } - listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) + listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, + Utils.cloneProperties(properties))) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast @@ -1251,7 +1438,8 @@ private[spark] class DAGScheduler( logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( - tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) + tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties, + stage.resourceProfileId)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run @@ -1260,9 +1448,9 @@ private[spark] class DAGScheduler( stage match { case stage: ShuffleMapStage => logDebug(s"Stage ${stage} is actually done; " + - s"(available: ${stage.isAvailable}," + - s"available outputs: ${stage.numAvailableOutputs}," + - s"partitions: ${stage.numPartitions})") + s"(available: ${stage.isAvailable}," + + s"available outputs: ${stage.numAvailableOutputs}," + + s"partitions: ${stage.numPartitions})") markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") @@ -1272,15 +1460,15 @@ private[spark] class DAGScheduler( } /** - * Merge local values from a task into the corresponding accumulators previously registered - * here on the driver. - * - * Although accumulators themselves are not thread-safe, this method is called only from one - * thread, the one that runs the scheduling loop. This means we only handle one task - * completion event at a time so we don't need to worry about locking the accumulators. - * This still doesn't stop the caller from updating the accumulator outside the scheduler, - * but that's not our problem since there's nothing we can do about that. - */ + * Merge local values from a task into the corresponding accumulators previously registered + * here on the driver. + * + * Although accumulators themselves are not thread-safe, this method is called only from one + * thread, the one that runs the scheduling loop. This means we only handle one task + * completion event at a time so we don't need to worry about locking the accumulators. + * This still doesn't stop the caller from updating the accumulator outside the scheduler, + * but that's not our problem since there's nothing we can do about that. + */ private def updateAccumulators(event: CompletionEvent): Unit = { val task = event.task val stage = stageIdToStage(task.stageId) @@ -1336,9 +1524,9 @@ private[spark] class DAGScheduler( } /** - * Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to see if we should - * interrupt running tasks. Returns `false` if the property value is not a boolean value - */ + * Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to see if we should + * interrupt running tasks. Returns `false` if the property value is not a boolean value + */ private def shouldInterruptTaskThread(job: ActiveJob): Boolean = { if (job.properties == null) { false @@ -1357,9 +1545,9 @@ private[spark] class DAGScheduler( } /** - * Responds to a task finishing. This is called inside the event loop so it assumes that it can - * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. - */ + * Responds to a task finishing. This is called inside the event loop so it assumes that it can + * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. + */ private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = { val task = event.task val stageId = task.stageId @@ -1372,7 +1560,7 @@ private[spark] class DAGScheduler( event.reason) if (!stageIdToStage.contains(task.stageId)) { - // The stage may have already finished when we get this event -- eg. maybe it was a + // The stage may have already finished when we get this event -- e.g. maybe it was a // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are // doing their own accounting and if they don't get the task end event they think @@ -1473,7 +1661,8 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + if (executorFailureEpoch.contains(execId) && + smt.epoch <= executorFailureEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most @@ -1527,7 +1716,7 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + disallowStageRetryForTest // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is @@ -1674,10 +1863,19 @@ private[spark] class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. + val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled + val isHostDecommissioned = taskScheduler + .getExecutorDecommissionState(bmAddress.executorId) + .exists(_.workerHost.isDefined) + + // Shuffle output of all executors on host `bmAddress.host` may be lost if: + // - External shuffle service is enabled, so we assume that all shuffle data on node is + // bad. + // - Host is decommissioned, thus all executors on that host will die. + val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled || + isHostDecommissioned + val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost + && unRegisterOutputOnHostOnFetchFailure) { Some(bmAddress.host) } else { // Unregister shuffle data just for one executor (we don't have any @@ -1688,7 +1886,14 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + maybeEpoch = Some(task.epoch), + // shuffleFileLostEpoch is ignored when a host is decommissioned because some + // decommissioned executors on that host might have been removed before this fetch + // failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and + // proceed with unconditional removal of shuffle outputs from all executors on that + // host, including from those that we still haven't confirmed as lost due to heartbeat + // delays. + ignoreShuffleFileLostEpoch = isHostDecommissioned) } } @@ -1716,7 +1921,9 @@ private[spark] class DAGScheduler( // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask. val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " + "failed." - taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason) + val job = jobIdToActiveJob.get(failedStage.firstJobId) + val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j)) + taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason) } catch { case e: UnsupportedOperationException => // Cannot continue with barrier stage if failed to cancel zombie barrier tasks. @@ -1731,8 +1938,8 @@ private[spark] class DAGScheduler( // TODO Refactor the failure handling logic to combine similar code with that of // FetchFailed. val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { @@ -1778,17 +1985,17 @@ private[spark] class DAGScheduler( handleResubmittedFailure(task, stage) case _: TaskCommitDenied => - // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits + // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits case _: ExceptionFailure | _: TaskKilled => - // Nothing left to do, already handled above for accumulator updates. + // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => - // Do nothing here; the TaskScheduler handles these failures and resubmits the task. + // Do nothing here; the TaskScheduler handles these failures and resubmits the task. case _: ExecutorLostFailure | UnknownReason => - // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler - // will abort the job. + // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler + // will abort the job. } } @@ -1815,89 +2022,113 @@ private[spark] class DAGScheduler( } /** - * Responds to an executor being lost. This is called inside the event loop, so it assumes it can - * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. - * - * We will also assume that we've lost all shuffle blocks associated with the executor if the - * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave - * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we - * presume all shuffle data related to this executor to be lost. - * - * Optionally the epoch during which the failure was caught can be passed to avoid allowing - * stray fetch failures from possibly retriggering the detection of a node as lost. - */ + * Responds to an executor being lost. This is called inside the event loop, so it assumes it can + * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. + * + * We will also assume that we've lost all shuffle blocks associated with the executor if the + * executor serves its own blocks (i.e., we're not using an external shuffle service), or the + * entire Standalone worker is lost. + */ private[scheduler] def handleExecutorLost( - execId: String, - workerLost: Boolean): Unit = { + execId: String, + workerHost: Option[String]): Unit = { // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) val remoteShuffleClass = classOf[RemoteShuffleManager].getName val remoteShuffleEnabled = env.conf.get("spark.shuffle.manager") == remoteShuffleClass - // If remote shuffle is enabled, shuffle files will be taken care of by remote storage, the - // unregistering and rerun of certain tasks are not needed. - val fileLost = - !remoteShuffleEnabled && (workerLost || !env.blockManager.externalShuffleServiceEnabled) + val fileLost = !remoteShuffleEnabled && (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) removeExecutorAndUnregisterOutputs( execId = execId, fileLost = fileLost, - hostToUnregisterOutputs = None, + hostToUnregisterOutputs = workerHost, maybeEpoch = None) } + /** + * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle + * outputs for the executor or optionally its host. + * + * @param execId executor to be removed + * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated + * with the executor; this happens if the executor serves its own blocks (i.e., we're not + * using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed + * occurred (in which case we presume all shuffle data related to this executor to be lost). + * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the + * outputs on the host + * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents + * reprocessing for follow-on fetch failures) + */ private def removeExecutorAndUnregisterOutputs( - execId: String, - fileLost: Boolean, - hostToUnregisterOutputs: Option[String], - maybeEpoch: Option[Long] = None): Unit = { + execId: String, + fileLost: Boolean, + hostToUnregisterOutputs: Option[String], + maybeEpoch: Option[Long] = None, + ignoreShuffleFileLostEpoch: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { - failedEpoch(execId) = currentEpoch - logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) + logDebug(s"Considering removal of executor $execId; " + + s"fileLost: $fileLost, currentEpoch: $currentEpoch") + if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { + executorFailureEpoch(execId) = currentEpoch + logInfo(s"Executor lost: $execId (epoch $currentEpoch)") + if (pushBasedShuffleEnabled) { + // Remove fetchFailed host in the shuffle push merger list for push based shuffle + hostToUnregisterOutputs.foreach( + host => blockManagerMaster.removeShufflePushMergerLocation(host)) + } blockManagerMaster.removeExecutor(execId) - if (fileLost) { + clearCacheLocs() + } + if (fileLost) { + val remove = if (ignoreShuffleFileLostEpoch) { + true + } else if (!shuffleFileLostEpoch.contains(execId) || + shuffleFileLostEpoch(execId) < currentEpoch) { + shuffleFileLostEpoch(execId) = currentEpoch + true + } else { + false + } + if (remove) { hostToUnregisterOutputs match { case Some(host) => - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnHost(host) case None => - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") mapOutputTracker.removeOutputsOnExecutor(execId) } - clearCacheLocs() - - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } } } /** - * Responds to a worker being removed. This is called inside the event loop, so it assumes it can - * modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside. - * - * We will assume that we've lost all shuffle blocks associated with the host if a worker is - * removed, so we will remove them all from MapStatus. - * - * @param workerId identifier of the worker that is removed. - * @param host host of the worker that is removed. - * @param message the reason why the worker is removed. - */ + * Responds to a worker being removed. This is called inside the event loop, so it assumes it can + * modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside. + * + * We will assume that we've lost all shuffle blocks associated with the host if a worker is + * removed, so we will remove them all from MapStatus. + * + * @param workerId identifier of the worker that is removed. + * @param host host of the worker that is removed. + * @param message the reason why the worker is removed. + */ private[scheduler] def handleWorkerRemoved( - workerId: String, - host: String, - message: String): Unit = { + workerId: String, + host: String, + message: String): Unit = { logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host)) mapOutputTracker.removeOutputsOnHost(host) clearCacheLocs() } private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { - // remove from failedEpoch(execId) ? - if (failedEpoch.contains(execId)) { + // remove from executorFailureEpoch(execId) ? + if (executorFailureEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) - failedEpoch -= execId + executorFailureEpoch -= execId } + shuffleFileLostEpoch -= execId } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { @@ -1928,12 +2159,12 @@ private[spark] class DAGScheduler( } /** - * Marks a stage as finished and removes it from the list of running stages. - */ + * Marks a stage as finished and removes it from the list of running stages. + */ private def markStageAsFinished( - stage: Stage, - errorMessage: Option[String] = None, - willRetry: Boolean = false): Unit = { + stage: Stage, + errorMessage: Option[String] = None, + willRetry: Boolean = false): Unit = { val serviceTime = stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) case _ => "Unknown" @@ -1960,13 +2191,13 @@ private[spark] class DAGScheduler( } /** - * Aborts all jobs depending on a particular Stage. This is called in response to a task set - * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. - */ + * Aborts all jobs depending on a particular Stage. This is called in response to a task set + * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. + */ private[scheduler] def abortStage( - failedStage: Stage, - reason: String, - exception: Option[Throwable]): Unit = { + failedStage: Stage, + reason: String, + exception: Option[Throwable]): Unit = { if (!stageIdToStage.contains(failedStage.id)) { // Skip all the actions if the stage has been removed. return @@ -2019,9 +2250,9 @@ private[spark] class DAGScheduler( /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */ private def failJobAndIndependentStages( - job: ActiveJob, - failureReason: String, - exception: Option[Throwable] = None): Unit = { + job: ActiveJob, + failureReason: String, + exception: Option[Throwable] = None): Unit = { if (cancelRunningIndependentStages(job, failureReason)) { // SPARK-15783 important to cleanup state first, just for tests where we have some asserts // against the state. Otherwise we have a *little* bit of flakiness in the tests. @@ -2065,30 +2296,30 @@ private[spark] class DAGScheduler( } /** - * Gets the locality information associated with a partition of a particular RDD. - * - * This method is thread-safe and is called from both DAGScheduler and SparkContext. - * - * @param rdd whose partitions are to be looked at - * @param partition to lookup locality information for - * @return list of machines that are preferred by the partition - */ + * Gets the locality information associated with a partition of a particular RDD. + * + * This method is thread-safe and is called from both DAGScheduler and SparkContext. + * + * @param rdd whose partitions are to be looked at + * @param partition to lookup locality information for + * @return list of machines that are preferred by the partition + */ private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { getPreferredLocsInternal(rdd, partition, new HashSet) } /** - * Recursive implementation for getPreferredLocs. - * - * This method is thread-safe because it only accesses DAGScheduler state through thread-safe - * methods (getCacheLocs()); please be careful when modifying this method, because any new - * DAGScheduler state accessed by it may require additional synchronization. - */ + * Recursive implementation for getPreferredLocs. + * + * This method is thread-safe because it only accesses DAGScheduler state through thread-safe + * methods (getCacheLocs()); please be careful when modifying this method, because any new + * DAGScheduler state accessed by it may require additional synchronization. + */ private def getPreferredLocsInternal( - rdd: RDD[_], - partition: Int, - visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { + rdd: RDD[_], + partition: Int, + visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 if (!visited.add((rdd, partition))) { @@ -2150,8 +2381,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer /** - * The main event loop of the DAG scheduler. - */ + * The main event loop of the DAG scheduler. + */ override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { @@ -2184,11 +2415,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val workerLost = reason match { - case SlaveLost(_, true) => true - case _ => false + val workerHost = reason match { + case ExecutorProcessLost(_, workerHost, _) => workerHost + case ExecutorDecommission(workerHost) => workerHost + case _ => None } - dagScheduler.handleExecutorLost(execId, workerLost) + dagScheduler.handleExecutorLost(execId, workerHost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) @@ -2199,6 +2431,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) + case UnschedulableTaskSetAdded(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId) + + case UnschedulableTaskSetRemoved(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId) + case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) diff --git a/shuffle-hadoop/src/main/scala/org/apache/spark/shuffle/remote/RemoteShuffleManager.scala b/shuffle-hadoop/src/main/scala/org/apache/spark/shuffle/remote/RemoteShuffleManager.scala index 3b84e6b1..0a623d79 100644 --- a/shuffle-hadoop/src/main/scala/org/apache/spark/shuffle/remote/RemoteShuffleManager.scala +++ b/shuffle-hadoop/src/main/scala/org/apache/spark/shuffle/remote/RemoteShuffleManager.scala @@ -89,25 +89,6 @@ private[spark] class RemoteShuffleManager(private val conf: SparkConf) extends S * Called on executors by reduce tasks. */ override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) - - new RemoteShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], - shuffleBlockResolver, - blocksByAddress, - context, - metrics, - shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) - } - - override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, @@ -116,7 +97,7 @@ private[spark] class RemoteShuffleManager(private val conf: SparkConf) extends S context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new RemoteShuffleReader( diff --git a/shuffle-hadoop/src/test/scala/org/apache/spark/util/collection/RemoteAppendOnlyMapSuite.scala b/shuffle-hadoop/src/test/scala/org/apache/spark/util/collection/RemoteAppendOnlyMapSuite.scala index 6879d435..533ba798 100644 --- a/shuffle-hadoop/src/test/scala/org/apache/spark/util/collection/RemoteAppendOnlyMapSuite.scala +++ b/shuffle-hadoop/src/test/scala/org/apache/spark/util/collection/RemoteAppendOnlyMapSuite.scala @@ -21,7 +21,6 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.ref.WeakReference -import org.scalatest.Matchers import org.scalatest.concurrent.Eventually import org.apache.spark._ @@ -38,8 +37,7 @@ import org.apache.spark.util.CompletionIterator */ class RemoteAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext - with Eventually - with Matchers{ + with Eventually { import TestUtils.{assertNotSpilled, assertSpilled} private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS @@ -250,7 +248,7 @@ class RemoteAppendOnlyMapSuite extends SparkFunSuite } test("spilling with compression and encryption") { - testSimpleSpilling(Some(CompressionCodec.DEFAULT_COMPRESSION_CODEC), encrypt = true) + testSimpleSpilling(Some(CompressionCodec.FALLBACK_COMPRESSION_CODEC), encrypt = true) } /**