-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24248][K8S] Use level triggering and state reconciliation in scheduling and lifecycle #21366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
310263c
60990f1
f3bb80a
3343ba6
30b7f17
522b079
600e25f
931529a
9e5abfb
2156a20
aabc187
ee0d196
c2b9733
caffe23
ca3fdb3
79ebaf3
fadbe9f
4f58393
2a2374c
5850439
d4cf40f
c398ebb
45a02de
a8a3539
4a49677
57ea5dd
b30ed39
5b9c00f
260d82c
bd03451
f294dca
7bf49ba
b5c0fbf
c4b87d8
8615c06
0a205f6
3b85ab5
edc982b
e077c7e
a97fc5d
bd7b0d3
e9d7c8f
0fac4d5
e42dd4f
03b1064
c1b8431
108181d
9e0b758
8b0a211
1a99dce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| Copyright (c) 2000-2017, jMock.org | ||
| All rights reserved. | ||
|
|
||
| Redistribution and use in source and binary forms, with or without | ||
| modification, are permitted provided that the following conditions are | ||
| met: | ||
|
|
||
| Redistributions of source code must retain the above copyright notice, | ||
| this list of conditions and the following disclaimer. Redistributions | ||
| in binary form must reproduce the above copyright notice, this list of | ||
| conditions and the following disclaimer in the documentation and/or | ||
| other materials provided with the distribution. | ||
|
|
||
| Neither the name of jMock nor the names of its contributors may be | ||
| used to endorse or promote products derived from this software without | ||
| specific prior written permission. | ||
|
|
||
| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
| "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
| LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
| A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
| OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
| SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
| LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
| DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
| THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
| (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,6 +176,24 @@ private[spark] object Config extends Logging { | |
| .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") | ||
| .createWithDefaultString("1s") | ||
|
|
||
| val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = | ||
| ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") | ||
| .doc("Interval between polls against the Kubernetes API server to inspect the " + | ||
| "state of executors.") | ||
| .timeConf(TimeUnit.MILLISECONDS) | ||
| .checkValue(interval => interval > 0, s"API server polling interval must be a" + | ||
| " positive time value.") | ||
| .createWithDefaultString("30s") | ||
|
|
||
| val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this option is hard to reason about and relies on understanding an implementation detail (the event queue). Why not just pick a default and leave it at that? What scenario do we see for the user to try and choose this value?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to leave this configurable but not to document it. It would give users an escape hatch if for whatever reason they do need to adjust the timing.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mccheah because this is not internal.... shouldn't we include this in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should have been marked with |
||
| ConfigBuilder("spark.kubernetes.executor.eventProcessingInterval") | ||
| .doc("Interval between successive inspection of executor events sent from the" + | ||
| " Kubernetes API.") | ||
| .timeConf(TimeUnit.MILLISECONDS) | ||
| .checkValue(interval => interval > 0, s"Event processing interval must be a positive" + | ||
| " time value.") | ||
| .createWithDefaultString("1s") | ||
|
|
||
| val MEMORY_OVERHEAD_FACTOR = | ||
| ConfigBuilder("spark.kubernetes.memoryOverheadFactor") | ||
| .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + | ||
|
|
@@ -193,7 +211,6 @@ private[spark] object Config extends Logging { | |
| "Ensure that major Python version is either Python2 or Python3") | ||
| .createWithDefault("2") | ||
|
|
||
|
|
||
| val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.submission" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.scheduler.cluster.k8s | ||
|
|
||
| import io.fabric8.kubernetes.api.model.Pod | ||
|
|
||
| sealed trait ExecutorPodState { | ||
| def pod: Pod | ||
| } | ||
|
|
||
| case class PodRunning(pod: Pod) extends ExecutorPodState | ||
|
|
||
| case class PodPending(pod: Pod) extends ExecutorPodState | ||
|
|
||
| sealed trait FinalPodState extends ExecutorPodState | ||
|
|
||
| case class PodSucceeded(pod: Pod) extends FinalPodState | ||
|
|
||
| case class PodFailed(pod: Pod) extends FinalPodState | ||
|
|
||
| case class PodDeleted(pod: Pod) extends FinalPodState | ||
|
|
||
| case class PodUnknown(pod: Pod) extends ExecutorPodState |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.scheduler.cluster.k8s | ||
|
|
||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} | ||
|
|
||
| import io.fabric8.kubernetes.api.model.PodBuilder | ||
| import io.fabric8.kubernetes.client.KubernetesClient | ||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.KubernetesConf | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.util.{Clock, Utils} | ||
|
|
||
| private[spark] class ExecutorPodsAllocator( | ||
| conf: SparkConf, | ||
| executorBuilder: KubernetesExecutorBuilder, | ||
| kubernetesClient: KubernetesClient, | ||
| snapshotsStore: ExecutorPodsSnapshotsStore, | ||
| clock: Clock) extends Logging { | ||
|
|
||
| private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) | ||
|
|
||
| private val totalExpectedExecutors = new AtomicInteger(0) | ||
|
|
||
| private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) | ||
|
|
||
| private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) | ||
|
|
||
| private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000) | ||
|
|
||
| private val kubernetesDriverPodName = conf | ||
| .get(KUBERNETES_DRIVER_POD_NAME) | ||
| .getOrElse(throw new SparkException("Must specify the driver pod name")) | ||
|
|
||
| private val driverPod = kubernetesClient.pods() | ||
| .withName(kubernetesDriverPodName) | ||
| .get() | ||
|
|
||
| // Executor IDs that have been requested from Kubernetes but have not been detected in any | ||
| // snapshot yet. Mapped to the timestamp when they were created. | ||
| private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] | ||
|
|
||
| def start(applicationId: String): Unit = { | ||
| snapshotsStore.addSubscriber(podAllocationDelay) { | ||
| onNewSnapshots(applicationId, _) | ||
| } | ||
| } | ||
|
|
||
| def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) | ||
|
|
||
| private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { | ||
| newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) | ||
| // For all executors we've created against the API but have not seen in a snapshot | ||
| // yet - check the current time. If the current time has exceeded some threshold, | ||
| // assume that the pod was either never created (the API server never properly | ||
| // handled the creation request), or the API server created the pod but we missed | ||
| // both the creation and deletion events. In either case, delete the missing pod | ||
| // if possible, and mark such a pod to be rescheduled below. | ||
| newlyCreatedExecutors.foreach { case (execId, timeCreated) => | ||
| val currentTime = clock.getTimeMillis() | ||
| if (currentTime - timeCreated > podCreationTimeout) { | ||
| logWarning(s"Executor with id $execId was not detected in the Kubernetes" + | ||
| s" cluster after $podCreationTimeout milliseconds despite the fact that a" + | ||
| " previous allocation attempt tried to create it. The executor may have been" + | ||
| " deleted but the application missed the deletion event.") | ||
| Utils.tryLogNonFatalError { | ||
| kubernetesClient | ||
| .pods() | ||
| .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) | ||
| .delete() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't removeExecutorFromSpark be called here as well? Couldn't be the case that the executor exists at a higher level but K8s backend missed it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's handled by the lifecycle manager already, because the lifecycle manager looks at what the scheduler backend believes are its executors and reconciles them with what's in the snapshot. |
||
| } | ||
| newlyCreatedExecutors -= execId | ||
| } else { | ||
| logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" + | ||
| s" was created ${currentTime - timeCreated} milliseconds ago.") | ||
| } | ||
| } | ||
|
|
||
| if (snapshots.nonEmpty) { | ||
| // Only need to examine the cluster as of the latest snapshot, the "current" state, to see if | ||
| // we need to allocate more executors or not. | ||
| val latestSnapshot = snapshots.last | ||
| val currentRunningExecutors = latestSnapshot.executorPods.values.count { | ||
| case PodRunning(_) => true | ||
| case _ => false | ||
| } | ||
| val currentPendingExecutors = latestSnapshot.executorPods.values.count { | ||
| case PodPending(_) => true | ||
| case _ => false | ||
| } | ||
| val currentTotalExpectedExecutors = totalExpectedExecutors.get | ||
| logDebug(s"Currently have $currentRunningExecutors running executors and" + | ||
| s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" + | ||
| s" have been requested but are pending appearance in the cluster.") | ||
| if (newlyCreatedExecutors.isEmpty | ||
| && currentPendingExecutors == 0 | ||
| && currentRunningExecutors < currentTotalExpectedExecutors) { | ||
| val numExecutorsToAllocate = math.min( | ||
| currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize) | ||
| logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.") | ||
| for ( _ <- 0 until numExecutorsToAllocate) { | ||
| val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() | ||
| val executorConf = KubernetesConf.createExecutorConf( | ||
| conf, | ||
| newExecutorId.toString, | ||
| applicationId, | ||
| driverPod) | ||
| val executorPod = executorBuilder.buildFromFeatures(executorConf) | ||
| val podWithAttachedContainer = new PodBuilder(executorPod.pod) | ||
| .editOrNewSpec() | ||
| .addToContainers(executorPod.container) | ||
| .endSpec() | ||
| .build() | ||
| kubernetesClient.pods().create(podWithAttachedContainer) | ||
| newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() | ||
| logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") | ||
| } | ||
| } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { | ||
| // TODO handle edge cases if we end up with more running executors than expected. | ||
| logDebug("Current number of running executors is equal to the number of requested" + | ||
| " executors. Not scaling up further.") | ||
| } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) { | ||
| logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + | ||
| s" executors to begin running before requesting for more executors. # of executors in" + | ||
| s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" + | ||
| s" created but we have not observed as being present in the cluster yet:" + | ||
| s" ${newlyCreatedExecutors.size}.") | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why adding this to the top level pom?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always add to the top level and then in the lower level poms, we reference the dependent modules without listing their versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm a bit concerned adding rxjava to the top level pom and to dev/deps/spark-deps-hadoop-*
can it be just a
<arrow.version>0.8.0</arrow.version>thing and not a dependency?it might possibly conflict with calling Spark from the Reactive Stream stack? @skonto what do you think?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also added dependency should have its LICENSE added under /license
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure what you mean here - we're using rxjava itself specifically to do the event handling in this PR. See https://github.com/apache/spark/pull/21366/files#diff-ae4cd884779fb4c3db58958ab984db59R40. If we wanted an alternative approach we can build something from first principles (executor service / manual linked blocking queues) but I like the elegance that rx-java buys us here. The code we'd save building ourselves seems worthwhile.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on reactive streams library so you dont need to bring rx-Java in. @ktoso correct me if I am wrong.
Have an example of a specific controller to get a better understanding?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Akka Streams does not depend on Rx of course, they both alternative implementations of Reactive Streams ( http://reactive-streams.org/ ) which have been included in JDK9 as
java.util.concurrent.Flow.*and Akka also implements those, but does not require JDK9; you can use JDK8 + RS and if you use JDK9 you could use the JDK's types but it's not required. Both Akka and Rx implement the respective interfaces in RS / JDK, so can inter-op thanks to that (see the RS site for details).Anything else I should clarify or review here? For inter-op purposes it would be good to not expose on a specific implementation but expose the reactive-streams types (
org.reactivestreams.Publisheretc), but that only matters if the types are exposed. As for including dependencies in core Spark -- I would expect this to carry quite a bit of implications though don't know Spark's rules about it (ofc less dependencies == better for users, since less chances to version-clash with libraries they'd use)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types are not exposed - they're only implementation details in the Kubernetes module. Furthermore the RxJava dependency will be in the Spark distribution but is not a dependency pulled in by spark-core.
It sounds like there is some contention with the extra dependency though, so should we be considering implementing our own mechanisms from the ground up? I think the bottom line question is: can spark-kubernetes, NOT spark-core, pull in RxJava?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up just removing reactive programming entirely - the buffering is implemented manually. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanx @ktoso!