Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Bumped fabric8 kubernetesclient to 5.0.2.
Browse files Browse the repository at this point in the history
  • Loading branch information
RayRoestenburg committed Jan 26, 2022
1 parent ac53960 commit de2d738
Show file tree
Hide file tree
Showing 30 changed files with 164 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ final case class ConfigureExecution(c: Configure, client: KubeClient, logger: Cl
cloudflowConfig,
() => client.getKafkaClusters(namespace = c.operatorNamespace).map(parseValues))

uid <- client.uidCloudflowApp(currentCr.spec.appId, namespace)
uid <- client.uidCloudflowApp(currentCr.getSpec.appId, namespace)
_ <- client.configureCloudflowApp(
currentCr.spec.appId,
currentCr.getSpec.appId,
namespace,
uid,
configStr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
import DeployExecution._

private def applicationDescriptorValidation(crApp: App.Cr): Try[Unit] = {
crApp.spec.version match {
crApp.getSpec.version match {
case None =>
Failure(CliException("Application file parse error: spec.version is missing or empty"))

Expand All @@ -42,7 +42,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
case _ => Failure(CliException("Application file parse error: spec.version is invalid"))
}
libraryVersion <- Try {
val libraryVersion = crApp.spec.libraryVersion.get
val libraryVersion = crApp.getSpec.libraryVersion.get
require { !libraryVersion.contains(' ') }
libraryVersion
}.recoverWith {
Expand Down Expand Up @@ -77,7 +77,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
}

private def referencedKafkaSecretExists(appCr: App.Cr, kafkaClusters: () => Try[List[String]]): Try[Unit] = {
val expectedClusters = appCr.spec.deployments.flatMap(_.portMappings.values.map(_.cluster)).flatten.distinct
val expectedClusters = appCr.getSpec.deployments.flatMap(_.portMappings.values.map(_.cluster)).flatten.distinct

if (expectedClusters.nonEmpty) {
(for {
Expand All @@ -95,11 +95,11 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
}

private def getImageReference(crApp: App.Cr) = {
if (crApp.spec.deployments.size < 1) {
if (crApp.getSpec.deployments.size < 1) {
Failure(CliException("The application specification doesn't contains deployments"))
} else {
// Get the first available image, all images must be present in the same repository.
val imageRef = crApp.spec.deployments(0).image
val imageRef = crApp.getSpec.deployments(0).image

Image(imageRef)
}
Expand All @@ -115,14 +115,14 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
baseApplicationCr <- loadCrFile(d.crFile)
localApplicationCr = {
d.serviceAccount match {
case Some(sa) => baseApplicationCr.copy(spec = baseApplicationCr.spec.copy(serviceAccount = Some(sa)))
case Some(sa) => baseApplicationCr.copy(_spec = baseApplicationCr.getSpec.copy(serviceAccount = Some(sa)))
case _ => baseApplicationCr
}
}
namespace = d.namespace.getOrElse(localApplicationCr.spec.appId)
namespace = d.namespace.getOrElse(localApplicationCr.getSpec.appId)

// update the replicas
currentAppCr <- client.readCloudflowApp(localApplicationCr.spec.appId, namespace)
currentAppCr <- client.readCloudflowApp(localApplicationCr.getSpec.appId, namespace)
clusterReplicas = getStreamletsReplicas(currentAppCr)
clusterApplicationCr <- updateReplicas(localApplicationCr, clusterReplicas)
applicationCrReplicas <- updateReplicas(clusterApplicationCr, d.scales)
Expand Down Expand Up @@ -154,7 +154,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
})

// Operations on the cluster
name = applicationCr.spec.appId
name = applicationCr.getSpec.appId
_ <- client.createNamespace(namespace)
_ <- {
if (d.noRegistryCredentials) Success(())
Expand All @@ -166,7 +166,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
dockerPassword = d.dockerPassword)
}
}
uid <- client.createCloudflowApp(applicationCr.spec, namespace)
uid <- client.createCloudflowApp(applicationCr.getSpec, namespace)
_ <- client.configureCloudflowApp(name, namespace, uid, configStr, logbackContent, streamletsConfigs)
} yield {
logger.trace("Command Deploy executed successfully")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait WithConfiguration {

private def validateConfiguredStreamlets(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
val configStreamlets = cloudflowConfig.cloudflow.streamlets.keys.toSeq.distinct
val crStreamlets = crApp.spec.streamlets.map(_.name).toSeq.distinct
val crStreamlets = crApp.getSpec.streamlets.map(_.name).toSeq.distinct

configStreamlets.diff(crStreamlets) match {
case Nil => Success(())
Expand All @@ -99,7 +99,7 @@ trait WithConfiguration {

private def validateTopicIds(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
val configTopics = cloudflowConfig.cloudflow.topics.keys.toSeq.distinct
val crTopics = crApp.spec.deployments.flatMap(_.portMappings.values.map(_.id)).distinct
val crTopics = crApp.getSpec.deployments.flatMap(_.portMappings.values.map(_.id)).distinct

configTopics.diff(crTopics) match {
case Nil => Success(())
Expand Down Expand Up @@ -161,7 +161,7 @@ trait WithConfiguration {

def validateConfigParameters(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
Try {
crApp.spec.streamlets.foreach { streamlet =>
crApp.getSpec.streamlets.foreach { streamlet =>
cloudflowConfig.cloudflow.streamlets.get(streamlet.name).foreach { s =>
val configParameters = streamlet.descriptor.configParameters
val detectedConfigParameters =
Expand Down Expand Up @@ -212,16 +212,16 @@ trait WithConfiguration {
for {
userConfig <- tryUserConfig
cloudflowConfig <- CloudflowConfig.loadAndValidate(userConfig)
defaultConfig = CloudflowConfig.defaultConfig(appCr.spec)
defaultMounts = CloudflowConfig.defaultMountsConfig(appCr.spec, allowedRuntimes = List("flink", "spark"))
defaultConfig = CloudflowConfig.defaultConfig(appCr.getSpec)
defaultMounts = CloudflowConfig.defaultMountsConfig(appCr.getSpec, allowedRuntimes = List("flink", "spark"))
config = userConfig
.withFallback(CloudflowConfig.writeConfig(defaultConfig))
.withFallback(CloudflowConfig.writeConfig(defaultMounts))
.withFallback {
loggingConfig match {
case Some(s) =>
CloudflowConfig.writeConfig(
CloudflowConfig.loggingMountsConfig(appCr.spec, s"${MurmurHash3.stringHash(s)}"))
CloudflowConfig.loggingMountsConfig(appCr.getSpec, s"${MurmurHash3.stringHash(s)}"))
case None => ConfigFactory.empty()
}
}
Expand Down Expand Up @@ -327,7 +327,7 @@ trait WithConfiguration {

val allReferencedClusters = {
appConfig.cloudflow.topics.flatMap { case (_, topic) => topic.cluster } ++
appCr.spec.deployments.flatMap(_.portMappings.values.flatMap(_.cluster)) ++
appCr.getSpec.deployments.flatMap(_.portMappings.values.flatMap(_.cluster)) ++
Seq(DefaultConfigurationName)
}

Expand All @@ -345,10 +345,10 @@ trait WithConfiguration {
}.toMap
}
res <- Try {
appCr.spec.deployments.map { deployment =>
appCr.getSpec.deployments.map { deployment =>
val streamletConfig = CloudflowConfig.streamletConfig(deployment.streamletName, deployment.runtime, appConfig)
val streamletWithPortMappingsConfig = portMappings(deployment, appConfig, streamletConfig, clustersConfig)
val applicationConfig = applicationRunnerConfig(appCr.name, appCr.spec.appVersion, deployment)
val applicationConfig = applicationRunnerConfig(appCr.name, appCr.getSpec.appVersion, deployment)
deployment -> StreamletConfigs(
streamlet = streamletWithPortMappingsConfig,
runtime = CloudflowConfig.runtimeConfig(deployment.streamletName, deployment.runtime, appConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait WithUpdateReplicas {
appCr
.map { app =>
(for {
streamlet <- app.spec.deployments
streamlet <- app.getSpec.deployments
} yield {
streamlet.replicas match {
case Some(r) => Some(streamlet.streamletName -> r)
Expand All @@ -27,18 +27,18 @@ trait WithUpdateReplicas {
}

def updateReplicas(crApp: App.Cr, replicas: Map[String, Int]): Try[App.Cr] = {
val allStreamlets = crApp.spec.deployments.map { streamlet => streamlet.streamletName }.distinct
val allStreamlets = crApp.getSpec.deployments.map { streamlet => streamlet.streamletName }.distinct

(replicas.keys.toList.distinct.diff(allStreamlets)) match {
case Nil =>
val clusterDeployments = crApp.spec.deployments.map { streamlet =>
val clusterDeployments = crApp.getSpec.deployments.map { streamlet =>
streamlet.streamletName match {
case sname if replicas.contains(sname) =>
streamlet.copy(replicas = Some(replicas(sname)))
case _ => streamlet
}
}
val res = crApp.copy(spec = crApp.spec.copy(deployments = clusterDeployments))
val res = crApp.copy(_spec = crApp.getSpec.copy(deployments = clusterDeployments))
Success(res)
case missings =>
Failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait WithUpdateVolumeMounts {
for {
streamletVolumeNameToPvc <- streamletVolumeNameToPvcMap(crApp, volumeMountsArgs, pvcs)
_ <- missingStreamletVolumeMountNames(crApp, streamletVolumeNameToPvc.keys)
} yield crApp.copy(spec = crApp.spec.copy(
} yield crApp.copy(_spec = crApp.getSpec.copy(
deployments = updatedDeployments(crApp, streamletVolumeNameToPvc),
streamlets = updatedStreamlets(crApp, streamletVolumeNameToPvc)))
}
Expand All @@ -44,11 +44,11 @@ trait WithUpdateVolumeMounts {
}
val streamletName = parts(0)
val volumeMountName = parts(1)
if (crApp.spec.deployments.find(_.streamletName == streamletName).isEmpty) {
if (crApp.getSpec.deployments.find(_.streamletName == streamletName).isEmpty) {
throw new CliException(s"Cannot find streamlet '$streamletName' in --volume-mount argument")
}

if (crApp.spec.deployments
if (crApp.getSpec.deployments
.filter(_.streamletName == streamletName)
.flatMap(_.volumeMounts)
.find(_.name == volumeMountName)
Expand Down Expand Up @@ -82,15 +82,15 @@ trait WithUpdateVolumeMounts {
}

private def streamletVolumeMountNamesInCr(crApp: App.Cr): Set[(String, String)] = {
crApp.spec.deployments
crApp.getSpec.deployments
.flatMap(deployment => deployment.volumeMounts.map(vm => deployment.streamletName -> vm.name))
.toSet
}

private def updatedDeployments(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Deployment] = {
crApp.spec.deployments.map { deployment =>
crApp.getSpec.deployments.map { deployment =>
deployment.copy(volumeMounts = deployment.volumeMounts.map { vmd =>
streamletVolumeNameToPvc
.get((deployment.streamletName, vmd.name))
Expand All @@ -103,7 +103,7 @@ trait WithUpdateVolumeMounts {
private def updatedStreamlets(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Streamlet] = {
crApp.spec.streamlets.map { streamlet =>
crApp.getSpec.streamlets.map { streamlet =>
streamlet.copy(descriptor = streamlet.descriptor.copy(volumeMounts = streamlet.descriptor.volumeMounts.map {
vmd =>
streamletVolumeNameToPvc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ class KubeClientFabric8(
.find(_.getMetadata.getName == appName)
.getOrElse(throw CliException(s"""Cloudflow application "${appName}" not found"""))

val appStatus: String = Try(app.status.appStatus).toOption.getOrElse("Unknown")
val appStatus: String = Try(app.getStatus.appStatus).toOption.getOrElse("Unknown")

val res = models.ApplicationStatus(
summary = getCRSummary(app),
status = appStatus,
// FIXME, remove in a breaking CRD change, the endpoint statuses are not updated anymore.
endpointsStatuses = Try(app.status.endpointStatuses).toOption
endpointsStatuses = Try(app.getStatus.endpointStatuses).toOption
.filterNot(_ == null)
.map(_.map(getEndpointStatus))
.getOrElse(Seq.empty),
streamletsStatuses = Try(app.status.streamletStatuses).toOption
streamletsStatuses = Try(app.getStatus.streamletStatuses).toOption
.filterNot(_ == null)
.map(_.map(getStreamletStatus))
.getOrElse(Seq.empty))
Expand Down Expand Up @@ -512,7 +512,7 @@ class KubeClientFabric8(
endpointStatuses = Seq(),
streamletStatuses = Seq())

App.Cr(spec = spec, metadata = metadata, status = status)
App.Cr(_spec = spec, _metadata = metadata, _status = status)
}

private def createCFApp(spec: App.Spec, namespace: String): Try[String] =
Expand Down Expand Up @@ -580,7 +580,7 @@ class KubeClientFabric8(
Try {
cloudflowApps
.inNamespace(namespace)
.withName(app.spec.appId)
.withName(app.getSpec.appId)
// NOTE: Patch doesn't work
//.patch(app)
.replace(app)
Expand Down Expand Up @@ -663,7 +663,7 @@ private object ModelConversions {
models.CRSummary(
name = app.name,
namespace = app.namespace,
version = app.spec.appVersion,
version = app.getSpec.appVersion,
creationTime = app.getMetadata.getCreationTimestamp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class ConfigValidationSpec extends AnyFlatSpec with Matchers with TryValues {

def crWithCPDescriptors(cpDescriptors: Seq[App.ConfigParameterDescriptor]) = {
App.Cr(
metadata = null,
spec = App.Spec(
_metadata = null,
_spec = App.Spec(
appId = "",
appVersion = "",
deployments = Seq(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class WithUpdateVolumesMountsSpec extends AnyFlatSpec with WithUpdateVolumeMount
val streamletName = "my-streamlet"
def crWithVolumeMounts(volumeMounts: Seq[App.VolumeMountDescriptor]) = {
App.Cr(
metadata = null,
spec = App.Spec(
_metadata = null,
_spec = App.Spec(
appId = "",
appVersion = "",
deployments = Seq(
Expand Down Expand Up @@ -83,13 +83,13 @@ class WithUpdateVolumesMountsSpec extends AnyFlatSpec with WithUpdateVolumeMount

// Assert
val volumeMountsInDescriptor =
updatedCr.success.value.spec.streamlets
updatedCr.success.value.getSpec.streamlets
.find(_.name == streamletName)
.map(_.descriptor.volumeMounts)
.getOrElse(Seq())
volumeMountsInDescriptor should contain(expectedVolumeMount)
val volumeMountsInDeployment =
updatedCr.success.value.spec.deployments
updatedCr.success.value.getSpec.deployments
.find(_.streamletName == streamletName)
.map(_.volumeMounts)
.getOrElse(Seq())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,8 @@ class CloudflowConfigSpec extends AnyFlatSpec with Matchers with OptionValues wi
val appCr = Serialization.jsonMapper().readValue(crFile, classOf[App.Cr])

// Act
val res = ConfigFactory.empty().withFallback(writeConfig(defaultMountsConfig(appCr.spec, List("flink", "spark"))))
val res =
ConfigFactory.empty().withFallback(writeConfig(defaultMountsConfig(appCr.getSpec, List("flink", "spark"))))

def getPvcPath(runtime: String) =
s"cloudflow.runtimes.$runtime.kubernetes.pods.pod.volumes.default.pvc.name"
Expand Down
19 changes: 10 additions & 9 deletions core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,19 @@ object App {
@Kind(Kind)
@Plural(Plural)
final case class Cr(
@JsonProperty("spec")
spec: Spec,
_spec: Spec,
@JsonProperty("metadata")
metadata: ObjectMeta,
@JsonProperty("status")
status: AppStatus = null)
_metadata: ObjectMeta,
_status: AppStatus = null)
extends CustomResource[Spec, AppStatus]
with Namespaced {
this.setMetadata(metadata)

def name: String = metadata.getName()
def namespace: String = metadata.getNamespace()
this.setMetadata(_metadata)
this.setSpec(_spec)
this.setStatus(_status)
override def initSpec = _spec
override def initStatus = _status
def name: String = getMetadata.getName()
def namespace: String = getMetadata.getNamespace()
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Actions {
podName: String,
podNamespace: String,
cause: ObjectReference): Seq[Action] = {
require(currentApp.forall(_.spec.appId == newApp.spec.appId))
require(currentApp.forall(_.getSpec.appId == newApp.getSpec.appId))
val labels = CloudflowLabels(newApp)
val ownerReferences = List(AppOwnerReference(newApp.name, newApp.getMetadata.getUid))
prepareNamespace(newApp, runners, labels, ownerReferences) ++
Expand All @@ -52,7 +52,7 @@ object Actions {
// If an existing status is there, update status based on app (expected pod counts)
// in case pod events do not occur, for instance when a operator delegated to is not responding
Option(newApp.getStatus).flatMap { st =>
val newStatus = CloudflowStatus.updateApp(newApp, runners).status
val newStatus = CloudflowStatus.updateApp(newApp, runners).getStatus
if (newStatus != st) Some(CloudflowStatus.statusUpdateAction(newApp)())
else None
}.toList ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object CloudflowLabels {
val ConfigUpdateLabel = "com.lightbend.cloudflow/config-update"

def apply(app: App.Cr): CloudflowLabels =
CloudflowLabels(app.spec.appId, app.spec.appVersion)
CloudflowLabels(app.getSpec.appId, app.getSpec.appVersion)

// The name of the application
val Name = "app.kubernetes.io/name"
Expand Down
Loading

0 comments on commit de2d738

Please sign in to comment.