Skip to content

Commit 92b61e0

Browse files
committed
WIP renewal service with specificed contract
1 parent 68dde34 commit 92b61e0

File tree

11 files changed

+178
-32
lines changed

11 files changed

+178
-32
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,8 @@ private[spark] class SparkSubmit extends Logging {
335335
val targetDir = Utils.createTempDir()
336336

337337
// assure a keytab is available from any place in a JVM
338-
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
338+
if (clusterManager == YARN || clusterManager == LOCAL ||
339+
clusterManager == KUBERNETES || isMesosClient) {
339340
if (args.principal != null) {
340341
if (args.keytab != null) {
341342
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager(
7070
"spark.yarn.security.credentials.%s.enabled")
7171
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
7272

73-
private val principal = sparkConf.get(PRINCIPAL).orNull
74-
private val keytab = sparkConf.get(KEYTAB).orNull
73+
protected val principal = sparkConf.get(PRINCIPAL).orNull
74+
protected val keytab = sparkConf.get(KEYTAB).orNull
7575

7676
require((principal == null) == (keytab == null),
7777
"Both principal and keytab must be defined, or neither.")
@@ -81,8 +81,8 @@ private[spark] class HadoopDelegationTokenManager(
8181
logDebug("Using the following builtin delegation token providers: " +
8282
s"${delegationTokenProviders.keys.mkString(", ")}.")
8383

84-
private var renewalExecutor: ScheduledExecutorService = _
85-
private val driverRef = new AtomicReference[RpcEndpointRef]()
84+
protected var renewalExecutor: ScheduledExecutorService = _
85+
protected val driverRef = new AtomicReference[RpcEndpointRef]()
8686

8787
/** Set the endpoint used to send tokens to the driver. */
8888
def setDriverRef(ref: RpcEndpointRef): Unit = {

docs/running-on-kubernetes.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -884,15 +884,24 @@ specific to Spark on Kubernetes.
884884
<td>(none)</td>
885885
<td>
886886
Specify the local file that contains the driver [pod template](#pod-template). For example
887-
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
887+
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml</code>
888888
</td>
889889
</tr>
890890
<tr>
891891
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
892892
<td>(none)</td>
893893
<td>
894894
Specify the local file that contains the executor [pod template](#pod-template). For example
895-
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
895+
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml</code>
896+
</td>
897+
</tr>
898+
<tr>
899+
<td><code>spark.kubernetes.kerberos.tokenSecret.renewal</code></td>
900+
<td>false</td>
901+
<td>
902+
Enabling the driver to watch the secret specified at
903+
<code>spark.kubernetes.kerberos.tokenSecret.name</code> for updates so that the tokens can be
904+
propagated to the executors.
896905
</td>
897906
</tr>
898907
</table>

docs/security.md

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -706,22 +706,6 @@ The following options provides finer-grained control for this feature:
706706
</tr>
707707
</table>
708708

709-
## Long-Running Applications
710-
711-
Long-running applications may run into issues if their run time exceeds the maximum delegation
712-
token lifetime configured in services it needs to access.
713-
714-
Spark supports automatically creating new tokens for these applications when running in YARN mode.
715-
Kerberos credentials need to be provided to the Spark application via the `spark-submit` command,
716-
using the `--principal` and `--keytab` parameters.
717-
718-
The provided keytab will be copied over to the machine running the Application Master via the Hadoop
719-
Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
720-
with encryption, at least.
721-
722-
The Kerberos login will be periodically renewed using the provided credentials, and new delegation
723-
tokens for supported will be created.
724-
725709
## Secure Interaction with Kubernetes
726710

727711
When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
@@ -798,6 +782,50 @@ achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing Co
798782
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
799783
<HDFS_FILE_LOCATION>
800784
```
785+
786+
## Long-Running Applications
787+
788+
Long-running applications may run into issues if their run time exceeds the maximum delegation
789+
token lifetime configured in services it needs to access.
790+
791+
Spark supports automatically creating new tokens for these applications when running in YARN, Mesos, and Kubernetes modes.
792+
If one wishes to launch the renewal thread in the Driver, Kerberos credentials need to be provided to the Spark application
793+
via the `spark-submit` command, using the `--principal` and `--keytab` parameters.
794+
795+
The provided keytab will be copied over to the machine running the Application Master via the Hadoop
796+
Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
797+
with encryption, at least.
798+
799+
The Kerberos login will be periodically renewed using the provided credentials, and new delegation
800+
tokens for supported will be created.
801+
802+
#### Long-Running Kerberos in Kubernetes
803+
804+
This section addresses the additional feature added uniquely to Kubernetes. If you are running an external token service
805+
that updates the secrets containing the Delegation Token for both the Driver and Executors to use, the ability for the
806+
executors to be updated with the secrets will be handled via a Watcher thread setup by the Driver. This Watcher thread
807+
will be launched only when you enable the `spark.kubernetes.kerberos.tokenSecret.renewal` config. This Watcher thread will
808+
be responsible for detecting updates that happen to the secret,defined at `spark.kubernetes.kerberos.tokenSecret.name`.
809+
810+
The contract that an external token service must have with this secret, is that the secret must be defined with the following
811+
specifications:
812+
813+
```yaml
814+
kind: Secret
815+
metadata:
816+
name: YOUR_SECRET_NAME
817+
namespace: YOUR_NAMESPACE
818+
type: Opaque
819+
data:
820+
spark.kubernetes.dt-CREATION_TIME-RENEWAL_TIME: YOUR_TOKEN_DATA
821+
```
822+
823+
where `YOUR_SECRET_NAME` is the value of `spark.kubernetes.kerberos.tokenSecret.name`, `YOUR_NAMESPACE` is the namespace
824+
in which the Driver and Executor are running, `CREATION_TIME` and `RENEWAL_TIME` are times related to UNIX timestamps
825+
defined by the time when the secrets are created and when the next time it should be renewed, respectively, and
826+
`YOUR_TOKEN_DATA` is Base.64() data containing your delegation token. The Driver Watcher thread will automatically pick up
827+
the data given these specifications and find the most recent token based on the `CREATION_TIME`.
828+
801829
# Event Logging
802830

803831
If your applications are using event logging, the directory where the event logs go

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,15 @@ private[spark] object Config extends Logging {
262262
.stringConf
263263
.createOptional
264264

265+
val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL =
266+
ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal")
267+
.doc("Enabling the driver to watch the secret specified at " +
268+
"spark.kubernetes.kerberos.tokenSecret.name for updates so that the " +
269+
"tokens can be propagated to the executors.")
270+
.booleanConf
271+
.createWithDefault(false)
272+
273+
265274
val APP_RESOURCE_TYPE =
266275
ConfigBuilder("spark.kubernetes.resource.type")
267276
.doc("This sets the resource type internally")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ private[spark] object Constants {
109109
val KERBEROS_SPARK_USER_NAME =
110110
"spark.kubernetes.kerberos.spark-user-name"
111111
val KERBEROS_SECRET_KEY = "hadoop-tokens"
112+
val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-"
112113

113114
// Hadoop credentials secrets for the Spark app.
114115
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
7878
def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
7979

8080
def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
81-
new KubernetesHadoopDelegationTokenManager(conf, hConf)
81+
new KubernetesHadoopDelegationTokenManager(conf, hConf, None)
8282

8383
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
8484

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,105 @@
1717

1818
package org.apache.spark.deploy.k8s.security
1919

20+
import java.io.{ByteArrayInputStream, DataInputStream}
21+
import java.io.File
22+
23+
import scala.collection.JavaConverters._
24+
25+
import io.fabric8.kubernetes.api.model.Secret
26+
import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher}
27+
import io.fabric8.kubernetes.client.KubernetesClient
28+
import io.fabric8.kubernetes.client.Watcher.Action
29+
import org.apache.commons.codec.binary.Base64
2030
import org.apache.hadoop.conf.Configuration
21-
import org.apache.hadoop.security.UserGroupInformation
31+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2232

2333
import org.apache.spark.SparkConf
34+
import org.apache.spark.deploy.SparkHadoopUtil
35+
import org.apache.spark.deploy.k8s.Config._
36+
import org.apache.spark.deploy.k8s.Constants._
2437
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
38+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
2539

2640
/**
2741
* Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
2842
*/
2943
private[spark] class KubernetesHadoopDelegationTokenManager(
3044
_sparkConf: SparkConf,
31-
_hadoopConf: Configuration)
45+
_hadoopConf: Configuration,
46+
kubernetesClient: Option[KubernetesClient])
3247
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
3348

3449
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
3550
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
3651

52+
private val isTokenRenewalEnabled =
53+
_sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL)
54+
55+
private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
56+
57+
if (isTokenRenewalEnabled) {
58+
require(dtSecretName.isDefined,
59+
"Must specify the token secret which the driver must watch for updates")
60+
}
61+
62+
private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = {
63+
val byteStream = new ByteArrayInputStream(data)
64+
val dataStream = new DataInputStream(byteStream)
65+
credentials.readTokenStorageStream(dataStream)
66+
}
67+
68+
private var watch: Watch = _
69+
70+
/**
71+
* As in HadoopDelegationTokenManager this starts the token renewer.
72+
* Upon start, if a principal and keytab are defined, the renewer will:
73+
*
74+
* - log in the configured principal, and set up a task to keep that user's ticket renewed
75+
* - obtain delegation tokens from all available providers
76+
* - send the tokens to the driver, if it's already registered
77+
* - schedule a periodic task to update the tokens when needed.
78+
*
79+
* In the case that the principal is NOT configured, one may still service a long running
80+
* app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service
81+
* to populate a secret with valid Delegation Tokens that the application will then use.
82+
* This is possibly via the use of a Secret watcher which the driver will leverage to
83+
* detect updates that happen to the secret so that it may retrieve that secret's contents
84+
* and send it to all expiring executors
85+
*
86+
* @return The newly logged in user, or null
87+
*/
88+
override def start(): UserGroupInformation = {
89+
val driver = driverRef.get()
90+
if (isTokenRenewalEnabled &&
91+
kubernetesClient.isDefined && driver != null) {
92+
watch = kubernetesClient.get
93+
.secrets()
94+
.inNamespace(_sparkConf.get(KUBERNETES_NAMESPACE))
95+
.withName(dtSecretName.get)
96+
.watch(new Watcher[Secret] {
97+
override def onClose(cause: KubernetesClientException): Unit =
98+
logInfo("Ending the watch of DT Secret")
99+
override def eventReceived(action: Watcher.Action, resource: Secret): Unit = {
100+
action match {
101+
case Action.ADDED | Action.MODIFIED =>
102+
logInfo("Secret update")
103+
val dataItems = resource.getData.asScala.filterKeys(
104+
_.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted
105+
val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None
106+
latestToken.foreach {
107+
case (_, data) =>
108+
val credentials = new Credentials
109+
deserialize(credentials, Base64.decodeBase64(data))
110+
val tokens = SparkHadoopUtil.get.serialize(credentials)
111+
driver.send(UpdateDelegationTokens(tokens))
112+
}
113+
}
114+
}
115+
})
116+
null
117+
} else {
118+
super.start()
119+
}
120+
}
37121
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
110110

111111
new KubernetesClusterSchedulerBackend(
112112
scheduler.asInstanceOf[TaskSchedulerImpl],
113-
sc.env.rpcEnv,
113+
sc,
114114
kubernetesClient,
115115
requestExecutorsService,
116116
snapshotsStore,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,26 @@ import java.util.concurrent.ExecutorService
2121
import io.fabric8.kubernetes.client.KubernetesClient
2222
import scala.concurrent.{ExecutionContext, Future}
2323

24+
import org.apache.spark.SparkContext
2425
import org.apache.spark.deploy.k8s.Constants._
26+
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
27+
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
2528
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
2629
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
2730
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
2831
import org.apache.spark.util.{ThreadUtils, Utils}
2932

3033
private[spark] class KubernetesClusterSchedulerBackend(
3134
scheduler: TaskSchedulerImpl,
32-
rpcEnv: RpcEnv,
35+
sc: SparkContext,
3336
kubernetesClient: KubernetesClient,
3437
requestExecutorsService: ExecutorService,
3538
snapshotsStore: ExecutorPodsSnapshotsStore,
3639
podAllocator: ExecutorPodsAllocator,
3740
lifecycleEventHandler: ExecutorPodsLifecycleManager,
3841
watchEvents: ExecutorPodsWatchSnapshotSource,
3942
pollEvents: ExecutorPodsPollingSnapshotSource)
40-
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
43+
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
4144

4245
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
4346
requestExecutorsService)
@@ -123,7 +126,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
123126
}
124127

125128
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
126-
new KubernetesDriverEndpoint(rpcEnv, properties)
129+
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
130+
}
131+
132+
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
133+
Some(new KubernetesHadoopDelegationTokenManager(conf,
134+
sc.hadoopConfiguration,
135+
Some(kubernetesClient)))
127136
}
128137

129138
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq}
2323
import org.mockito.Mockito.{never, verify, when}
2424
import org.scalatest.BeforeAndAfter
2525

26-
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
26+
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
2727
import org.apache.spark.deploy.k8s.Constants._
2828
import org.apache.spark.deploy.k8s.Fabric8Aliases._
2929
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -44,6 +44,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
4444
@Mock
4545
private var rpcEnv: RpcEnv = _
4646

47+
@Mock
48+
private var scEnv: SparkEnv = _
49+
4750
@Mock
4851
private var driverEndpointRef: RpcEndpointRef = _
4952

@@ -82,13 +85,15 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
8285
when(taskScheduler.sc).thenReturn(sc)
8386
when(sc.conf).thenReturn(sparkConf)
8487
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
88+
when(sc.env).thenReturn(scEnv)
89+
when(scEnv.rpcEnv).thenReturn(rpcEnv)
8590
when(rpcEnv.setupEndpoint(
8691
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
8792
.thenReturn(driverEndpointRef)
8893
when(kubernetesClient.pods()).thenReturn(podOperations)
8994
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
9095
taskScheduler,
91-
rpcEnv,
96+
sc,
9297
kubernetesClient,
9398
requestExecutorsService,
9499
eventQueue,

0 commit comments

Comments
 (0)