Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Fix for VolumeMount API #1072 (#1073)
Browse files Browse the repository at this point in the history
* Added support back in for --volume-mount.

 Added test.

Some refactoring, added check for missing volume mounts.

Better error message.

Cleanup.

Fix for compile.

Added a comment.

* Added the Cloudflow CRD. (#1074)

* Cloudflow Custom Resource Definition (CRD).

* Comment to remove endpoint statuses from CRD status, not used.

* Added docs to CRD fields.

* Remove tmp file.
  • Loading branch information
RayRoestenburg authored Jul 23, 2021
1 parent 120cefa commit 6c3f7f5
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 81 deletions.
75 changes: 0 additions & 75 deletions cloudflow-crd.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
extends Execution[DeployResult]
with WithProtocolVersion
with WithUpdateReplicas
with WithUpdateVolumeMounts
with WithConfiguration {
import DeployExecution._

Expand Down Expand Up @@ -105,15 +106,15 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
.flatten

if (!res.isEmpty) {
val ex: Throwable = res.map(_._2).flatten.headOption.getOrElse(null)
val ex: Throwable = res.flatMap(_._2).headOption.getOrElse(null)
Failure(CliException(res.map(_._1).mkString("\n"), ex))
} else {
Success(())
}
}

private def referencedKafkaSecretExists(appCr: App.Cr, kafkaClusters: () => Try[List[String]]): Try[Unit] = {
val expectedClusters = appCr.spec.deployments.map(_.portMappings.values.map(_.cluster)).flatten.flatten.distinct
val expectedClusters = appCr.spec.deployments.flatMap(_.portMappings.values.map(_.cluster)).flatten.distinct

if (expectedClusters.nonEmpty) {
(for {
Expand Down Expand Up @@ -160,7 +161,11 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
currentAppCr <- client.readCloudflowApp(localApplicationCr.spec.appId)
clusterReplicas = getStreamletsReplicas(currentAppCr)
clusterApplicationCr <- updateReplicas(localApplicationCr, clusterReplicas)
applicationCr <- updateReplicas(clusterApplicationCr, d.scales)
applicationCrReplicas <- updateReplicas(clusterApplicationCr, d.scales)
applicationCr <- updateVolumeMounts(
applicationCrReplicas,
d.volumeMounts,
() => client.getPvcs(namespace = applicationCrReplicas.spec.appId))

image <- getImageReference(applicationCr)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ trait WithConfiguration {

private def validateTopicIds(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
val configTopics = cloudflowConfig.cloudflow.topics.keys.toSeq.distinct
val crTopics = crApp.spec.deployments.map(_.portMappings.values.map(_.id)).flatten.distinct
val crTopics = crApp.spec.deployments.flatMap(_.portMappings.values.map(_.id)).distinct

configTopics.diff(crTopics) match {
case Nil => Success(())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.cli.cloudflow.execution

import akka.cli.cloudflow.CliException
import akka.datap.crd.App

import scala.util.{ Failure, Success, Try }

// This can be deprecated when VolumeMount API is deprecated.
trait WithUpdateVolumeMounts {
def updateVolumeMounts(
crApp: App.Cr,
volumeMountsArgs: Map[String, String],
pvcs: () => Try[List[String]]): Try[App.Cr] = {
for {
streamletVolumeNameToPvc <- streamletVolumeNameToPvcMap(crApp, volumeMountsArgs, pvcs)
_ <- missingStreamletVolumeMountNames(crApp, streamletVolumeNameToPvc.keys)
} yield crApp.copy(spec = crApp.spec.copy(
deployments = updatedDeployments(crApp, streamletVolumeNameToPvc),
streamlets = updatedStreamlets(crApp, streamletVolumeNameToPvc)))
}

private def streamletVolumeNameToPvcMap(
crApp: App.Cr,
volumeMountsArgs: Map[String, String],
pvcs: () => Try[List[String]]): Try[Map[(String, String), String]] = {
for {
existingPvcs <- pvcs()
map <- Try {
volumeMountsArgs.map {
case (streamletVolumeNamePath, pvcName) =>
if (!existingPvcs.contains(pvcName)) {
throw new CliException(
s"Cannot find persistent volume claim '$pvcName' specified via --volume-mount argument.")
}
// volumeMounts Map is "<streamlet-name>.<volume-mount-name>" -> pvc-name
val parts = streamletVolumeNamePath.split("\\.").toList
if (parts.size != 2) {
throw new CliException(
"--volume-mount argument is invalid, please provide as --volume-mount <streamlet-name>.<volume-mount-name>=<pvc-name>")
}
val streamletName = parts(0)
val volumeMountName = parts(1)
if (crApp.spec.deployments.find(_.streamletName == streamletName).isEmpty) {
throw new CliException(s"Cannot find streamlet '$streamletName' in --volume-mount argument")
}

if (crApp.spec.deployments
.filter(_.streamletName == streamletName)
.flatMap(_.volumeMounts)
.find(_.name == volumeMountName)
.isEmpty) {
throw new CliException(
s"Cannot find volume mount name '$volumeMountName' for streamlet '$streamletName' in --volume-mount argument")
}
(streamletName, volumeMountName) -> pvcName
}.toMap
}
} yield map
}

private def missingStreamletVolumeMountNames(
crApp: App.Cr,
streamletVolumeNamesFromArgs: Iterable[(String, String)]): Try[Unit] = {
Try {
val missing = (streamletVolumeMountNamesInCr(crApp) -- streamletVolumeNamesFromArgs.toSet)
.map { case (streamletName, volumeName) => s"$streamletName.$volumeName" }
.toSeq
.sorted
if (missing.nonEmpty) {
def plural = s"""${if (missing.size > 1) "s" else ""}"""
throw new CliException(
s"""Please provide persistent volume name$plural with --volume-mount argument$plural (replace 'pvc-name'$plural with correct value$plural):\n
|${missing.map(m => s"--volume-mount $m=pvc-name").mkString("\n")}
""".stripMargin)
}
()
}
}

private def streamletVolumeMountNamesInCr(crApp: App.Cr): Set[(String, String)] = {
crApp.spec.deployments
.flatMap(deployment => deployment.volumeMounts.map(vm => deployment.streamletName -> vm.name))
.toSet
}

private def updatedDeployments(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Deployment] = {
crApp.spec.deployments.map { deployment =>
deployment.copy(volumeMounts = deployment.volumeMounts.map { vmd =>
streamletVolumeNameToPvc
.get((deployment.streamletName, vmd.name))
.map(pvcName => vmd.copy(pvcName = Some(pvcName)))
.getOrElse(vmd)
})
}
}

private def updatedStreamlets(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Streamlet] = {
crApp.spec.streamlets.map { streamlet =>
streamlet.copy(descriptor = streamlet.descriptor.copy(volumeMounts = streamlet.descriptor.volumeMounts.map {
vmd =>
streamletVolumeNameToPvc
.get((streamlet.name, vmd.name))
.map(pvcName => vmd.copy(pvcName = Some(pvcName)))
.getOrElse(vmd)
}))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class KubeClientFabric8(
val res = models.ApplicationStatus(
summary = getCRSummary(app),
status = appStatus,
// FIXME, remove in a breaking CRD change, the endpoint statuses are not updated anymore.
endpointsStatuses = Try(app.status.endpointStatuses).toOption
.filterNot(_ == null)
.map(_.map(getEndpointStatus))
Expand Down
Loading

0 comments on commit 6c3f7f5

Please sign in to comment.