Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

User defined service-account for the streamlets #1095

Merged
merged 1 commit into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ object OptionsParser {
c.copy(unmanagedRuntimes = c.unmanagedRuntimes ++ r))
.optional()
.text("The runtimes that should not be checked"),
commandParse[commands.Deploy, String](opt("serviceaccount"))((c, sa) => c.copy(serviceAccount = Some(sa)))
.optional()
.text("the serviceaccount to be used"),
commandParse[commands.Deploy, Unit](opt("microservices"))((c, sc) => c.copy(microservices = true))
.optional()
.text("EXPERIMENTAL: Deploy on Akka Cloud Platform"),
Expand Down Expand Up @@ -500,6 +503,7 @@ object commands {
configKeys: Map[String, String] = Map(),
logbackConfig: Option[File] = None,
unmanagedRuntimes: Seq[String] = Seq(),
serviceAccount: Option[String] = None,
microservices: Boolean = false,
output: format.Format = format.Default)
extends Command[DeployResult]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
}

// prepare the data
localApplicationCr <- loadCrFile(d.crFile)
baseApplicationCr <- loadCrFile(d.crFile)
localApplicationCr = {
d.serviceAccount match {
case Some(sa) => baseApplicationCr.copy(spec = baseApplicationCr.spec.copy(serviceAccount = Some(sa)))
case _ => baseApplicationCr
}
}
namespace = d.namespace.getOrElse(localApplicationCr.spec.appId)

// update the replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,11 @@ class KubeClientFabric8(
def createCloudflowApp(spec: App.Spec, namespace: String): Try[String] =
for {
uid <- createCFApp(spec, namespace)
_ <- createCloudflowServiceAccount(spec.appId, namespace, getOwnerReference(spec.appId, uid))
_ <- {
if (spec.serviceAccount.isEmpty)
createCloudflowServiceAccount(spec.appId, namespace, getOwnerReference(spec.appId, uid))
else Success(())
}
} yield { uid }

def createMicroservicesApp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ConfigValidationSpec extends AnyFlatSpec with Matchers with TryValues {
agentPaths = Map(),
version = None,
libraryVersion = None,
serviceAccount = None,
streamlets = Seq(
App.Streamlet(
name = "my-streamlet",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WithUpdateVolumesMountsSpec extends AnyFlatSpec with WithUpdateVolumeMount
agentPaths = Map(),
version = None,
libraryVersion = None,
serviceAccount = None,
streamlets = Seq(
App.Streamlet(
name = streamletName,
Expand Down
4 changes: 3 additions & 1 deletion core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ object App {
@JsonProperty("version")
version: Option[String],
@JsonProperty("library_version")
libraryVersion: Option[String])
libraryVersion: Option[String],
@JsonProperty("service_account")
serviceAccount: Option[String])
extends KubernetesResource {}

@JsonDeserialize(using = classOf[JsonDeserializer.None])
Expand Down
16 changes: 16 additions & 0 deletions core/cloudflow-it/create-app-serviceaccount.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

SERVICEACCCOUNT=$1
if [ -z "$SERVICEACCCOUNT" ]; then
echo "No serviceaccount name specified."
exit 1
fi

NAMESPACE=$2
if [ -z "$NAMESPACE" ]; then
echo "No namespace name specified."
exit 1
fi

kubectl create serviceaccount "${SERVICEACCCOUNT}" --namespace "${NAMESPACE}"
kubectl create clusterrolebinding "${SERVICEACCCOUNT}-crb" --clusterrole=edit --serviceaccount="${NAMESPACE}:${SERVICEACCCOUNT}"
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De
Action.delete[Deployment](name, namespace)

def appActions(app: App.Cr, labels: CloudflowLabels, ownerReferences: List[OwnerReference]): Seq[Action] = {
val roleAkka = akkaRole(app.namespace, labels, ownerReferences)
Seq(
Action.createOrReplace(roleAkka),
Action.createOrReplace(akkaRoleBinding(app.namespace, roleAkka, labels, ownerReferences)))
app.spec.serviceAccount match {
case Some(_) => Seq()
case _ =>
val roleAkka = akkaRole(app.namespace, labels, ownerReferences)
Seq(
Action.createOrReplace(roleAkka),
Action.createOrReplace(akkaRoleBinding(app.namespace, roleAkka, labels, ownerReferences)))
}
}

case class PatchDeploymentAction(deployment: Deployment)(
Expand Down Expand Up @@ -301,7 +305,7 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De
configSecretVolumes

val podSpecBuilder = new PodSpecBuilder()
.withServiceAccount(Name.ofServiceAccount)
.withServiceAccount(app.spec.serviceAccount.getOrElse(Name.ofServiceAccount))
.withVolumes(allVolumes.asJava)
.withContainers(container)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
val parallelism = new AtomicReference(Map[String, Int]()) //flinkRunnerDefaults.parallelism

def appActions(app: App.Cr, labels: CloudflowLabels, ownerReferences: List[OwnerReference]): Seq[Action] = {
val roleFlink = flinkRole(app.namespace, labels, ownerReferences)
Vector(
Action.createOrReplace(roleFlink),
Action.createOrReplace(flinkRoleBinding(app.namespace, roleFlink, labels, ownerReferences)))
app.spec.serviceAccount match {
case Some(_) => Seq()
case _ =>
val roleFlink = flinkRole(app.namespace, labels, ownerReferences)
Vector(
Action.createOrReplace(roleFlink),
Action.createOrReplace(flinkRoleBinding(app.namespace, roleFlink, labels, ownerReferences)))
}
}

def streamletChangeAction(
Expand Down Expand Up @@ -207,7 +211,8 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
volumeMounts = volumeMounts,
flinkConfig = flinkConfig,
jobManagerConfig = jobManagerConfig,
taskManagerConfig = taskManagerConfig)
taskManagerConfig = taskManagerConfig,
serviceAccountName = app.spec.serviceAccount.getOrElse(Name.ofServiceAccount))

val name = resourceName(deployment)
val appLabels = CloudflowLabels(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner
val ExecutorPod = "executor"

def appActions(app: App.Cr, labels: CloudflowLabels, ownerReferences: List[OwnerReference]): Seq[Action] = {
val roleSpark = sparkRole(app.namespace, labels, ownerReferences)

Seq(
Action.createOrReplace(roleSpark),
Action.createOrReplace(sparkRoleBinding(app.namespace, roleSpark, labels, ownerReferences)))
app.spec.serviceAccount match {
case Some(_) => Seq()
case _ =>
val roleSpark = sparkRole(app.namespace, labels, ownerReferences)
Seq(
Action.createOrReplace(roleSpark),
Action.createOrReplace(sparkRoleBinding(app.namespace, roleSpark, labels, ownerReferences)))
}
}

def streamletChangeAction(
Expand Down Expand Up @@ -241,7 +244,8 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner
volumeMounts = volumeMounts ++ getVolumeMounts(podsConfig, DriverPod),
secrets = secrets,
env = getEnvironmentVariables(podsConfig, DriverPod),
securityContext = securityContext),
securityContext = securityContext,
serviceAccount = Some(app.spec.serviceAccount.getOrElse(Name.ofServiceAccount))),
podsConfig,
deployment)
val executor = addExecutorResourceRequirements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ object CloudflowApplicationSpecBuilder {
streamlets = streamlets,
agentPaths = agentPaths,
version = Some(BuildInfo.version),
libraryVersion = Some(BuildInfo.version))
libraryVersion = Some(BuildInfo.version),
serviceAccount = None)
}

private def toInOutletSchema(schema: SchemaDescriptor) = {
Expand Down