From e09e5b4addd916d9da0464734b19d1540fd851de Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Mon, 5 Apr 2021 16:41:03 +0100 Subject: [PATCH 01/17] Runner config from the CLI --- .../cloudflow/localrunner/LocalRunner.scala | 47 ++++++++++++++- .../runner/RunnerConfigResolver.scala | 16 +++-- .../blueprint/deployment/RunnerConfig.scala | 46 +-------------- .../execution/WithConfiguration.scala | 59 ++++++++++++++++++- .../operator/action/runner/Runner.scala | 3 +- 5 files changed, 114 insertions(+), 57 deletions(-) diff --git a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala index 42068d22c..ad38800ff 100644 --- a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala +++ b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala @@ -27,8 +27,9 @@ import scala.util.control.NonFatal import com.typesafe.config.{ Config, ConfigFactory } import org.slf4j.LoggerFactory import spray.json._ -import cloudflow.blueprint.deployment.{ ApplicationDescriptor, RunnerConfig, StreamletDeployment, StreamletInstance, Topic } +import cloudflow.blueprint.deployment.{ ApplicationDescriptor, StreamletDeployment, StreamletInstance, Topic } import cloudflow.blueprint.deployment.ApplicationDescriptorJsonFormat._ +import cloudflow.blueprint.VolumeMountDescriptor import cloudflow.blueprint.RunnerConfigUtils._ import cloudflow.streamlets.{ BooleanValidationType, DoubleValidationType, IntegerValidationType, StreamletExecution, StreamletLoader } import com.typesafe.config._ @@ -97,6 +98,46 @@ object LocalRunner extends StreamletLoader { } } + private def getRunnerConfig(appId: String, appVersion: String, deployment: StreamletDeployment): String = { + implicit val topicFormat = jsonFormat(Topic.apply, "id", "cluster", "config") + + def toRunnerJson(appId: String, appVersion: String, deployment: StreamletDeployment) = + JsObject( + "streamlet" -> JsObject( + "class_name" -> JsString(deployment.className), + "streamlet_ref" -> JsString(deployment.streamletName), + "context" -> JsObject( + "app_id" -> appId.toJson, + "app_version" -> appVersion.toJson, + "config" -> toJson(deployment.config), + "volume_mounts" -> toVolumeMountJson(deployment.volumeMounts), + "port_mappings" -> toPortMappingsJson(deployment.portMappings) + ) + ) + ) + + def toJson(config: Config) = config.root().render(ConfigRenderOptions.concise()).parseJson + + def toPortMappingsJson(portMappings: Map[String, Topic]) = + JsObject(portMappings.map { + case (portName, topic) => portName -> topic.toJson + }) + + def toVolumeMountJson(volumeMounts: Option[List[VolumeMountDescriptor]]) = + JsArray( + volumeMounts + .getOrElse(Vector()) + .map { + case VolumeMountDescriptor(name, path, accessMode, _) => + JsObject("name" -> JsString(name), "path" -> JsString(path), "access_mode" -> JsString(accessMode)) + } + .toVector + ) + + val map = Map("runner" -> toRunnerJson(appId, appVersion, deployment)) + JsObject("cloudflow" -> JsObject(map)).compactPrint + } + private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = { val bootstrapServers = if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else kafkaHost @@ -137,8 +178,8 @@ object LocalRunner extends StreamletLoader { StreamletDeployment.EndpointContainerPort + endpointIdx) deployment.endpoint.foreach(_ => endpointIdx += 1) - val runnerConfigObj = RunnerConfig(appId, appVersion, deployment) - val runnerConfig = addStorageConfig(ConfigFactory.parseString(runnerConfigObj.data), localStorageDirectory) + val runnerConfigObj = getRunnerConfig(appId, appVersion, deployment) + val runnerConfig = addStorageConfig(ConfigFactory.parseString(runnerConfigObj), localStorageDirectory) val patchedRunnerConfig = runnerConfig .withFallback(streamletParamConfig) diff --git a/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala b/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala index f6bbc37fc..1142f9961 100644 --- a/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala +++ b/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala @@ -28,20 +28,26 @@ trait RunnerConfigResolver { final val SecretConfigFile = "secret.conf" def makeConfig: Try[Config] = Try { - val configFilePathString = Option(System.getProperty("config.file")).getOrElse(s"$ConfigMountPath/$ConfigFile") + val configFilePathString = Option(System.getProperty("config.file")).getOrElse(s"$ConfigSecretMountPath/$ConfigFile") val configPath = Paths.get(configFilePathString) val secretPath = Paths.get(s"$ConfigSecretMountPath/$SecretConfigFile") + val applicationConfig = if (Files.exists(configPath)) { + configPath + } else { + Paths.get(s"$ConfigMountPath/$ConfigFile") + } + val config = if (Files.exists(secretPath)) { - println(s"Loading application.conf from: $configPath, secret config from: $secretPath") + println(s"Loading application.conf from: $applicationConfig, secret config from: $secretPath") // secret takes precedence, since it contains config. loadConfig(secretPath) - .withFallback(loadConfig(configPath)) + .withFallback(loadConfig(applicationConfig)) .withFallback(ConfigFactory.load) } else { - println(s"Loading application.conf from: $configPath") + println(s"Loading application.conf from: $applicationConfig") - loadConfig(configPath) + loadConfig(applicationConfig) .withFallback(ConfigFactory.load) } diff --git a/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/RunnerConfig.scala b/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/RunnerConfig.scala index 9c95b0b08..e71105227 100644 --- a/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/RunnerConfig.scala +++ b/tools/cloudflow-blueprint/src/main/scala/cloudflow/blueprint/deployment/RunnerConfig.scala @@ -18,59 +18,15 @@ package cloudflow.blueprint.deployment import java.io.File -import com.typesafe.config._ import spray.json._ -import cloudflow.blueprint.VolumeMountDescriptor - trait ConfigMapData { def filename: String def data: String } -case class RunnerConfig(data: String) extends ConfigMapData { - val filename: String = RunnerConfig.AppConfigFilename -} - -object RunnerConfig extends DefaultJsonProtocol with ConfigJsonFormat { +object RunnerConfig { val PortMappingsPath = "cloudflow.runner.streamlet.context.port_mappings" - val AppConfigFilename = "application.conf" - implicit val topicFormat = jsonFormat(Topic.apply, "id", "cluster", "config") - - def apply(appId: String, appVersion: String, deployment: StreamletDeployment): RunnerConfig = { - val map = Map("runner" -> toRunnerJson(appId, appVersion, deployment)) - RunnerConfig(JsObject("cloudflow" -> JsObject(map)).compactPrint) - } - - private def toRunnerJson(appId: String, appVersion: String, deployment: StreamletDeployment) = - JsObject( - "streamlet" -> JsObject( - "class_name" -> JsString(deployment.className), - "streamlet_ref" -> JsString(deployment.streamletName), - "context" -> JsObject( - "app_id" -> appId.toJson, - "app_version" -> appVersion.toJson, - "config" -> toJson(deployment.config), - "volume_mounts" -> toVolumeMountJson(deployment.volumeMounts), - "port_mappings" -> toPortMappingsJson(deployment.portMappings)))) - - private def toJson(config: Config) = config.root().render(ConfigRenderOptions.concise()).parseJson - - private def toPortMappingsJson(portMappings: Map[String, Topic]) = - JsObject(portMappings.map { - case (portName, topic) => portName -> topic.toJson - }) - - private def toVolumeMountJson(volumeMounts: Option[List[VolumeMountDescriptor]]) = - JsArray( - volumeMounts - .getOrElse(Vector()) - .map { - case VolumeMountDescriptor(name, path, accessMode, _) => - JsObject("name" -> JsString(name), "path" -> JsString(path), "access_mode" -> JsString(accessMode)) - } - .toVector) - } case class PrometheusConfig(data: String) extends ConfigMapData { diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index 4054bd556..9dbfab49c 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -12,14 +12,65 @@ import scala.util.hashing.MurmurHash3 import akka.cli.cloudflow.{ CliException, CliLogger } import akka.cloudflow.config.{ CloudflowConfig, UnsafeCloudflowConfigLoader } import akka.datap.crd.App +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{ JsonCreator, JsonInclude, JsonProperty } +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.{ JsonDeserializer, JsonNode } import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } import io.fabric8.kubernetes.client.utils.Serialization -private final case class StreamletConfigs(streamlet: Config, runtime: Config, pods: Config) +private final case class StreamletConfigs(streamlet: Config, runtime: Config, pods: Config, application: Config) trait WithConfiguration { val logger: CliLogger + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + private case class RunnerContext( + @JsonProperty("app_id") + appId: String, + @JsonProperty("app_version") + appVersion: String, + @JsonProperty("config") + config: JsonNode, + @JsonProperty("volume_mounts") + volumeMounts: JsonNode, + @JsonProperty("port_mappings") + portMappings: JsonNode) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + private case class StreamletConfig( + @JsonProperty("class_name") + className: String, + @JsonProperty("streamlet_ref") + streamletRef: String, + @JsonProperty("context") + context: RunnerContext) + + private def applicationRunnerConfig(appId: String, appVersion: String, deployment: App.Deployment): Config = { + ConfigFactory + .parseString( + Serialization + .jsonMapper() + .writeValueAsString(StreamletConfig( + className = deployment.className, + streamletRef = deployment.streamletName, + context = RunnerContext( + appId = appId, + appVersion = appVersion, + config = deployment.config, + volumeMounts = Serialization + .jsonMapper() + .readTree(Serialization.jsonMapper().writeValueAsString(deployment.volumeMounts)), + portMappings = Serialization + .jsonMapper() + .readTree(Serialization.jsonMapper().writeValueAsString(deployment.portMappings)))))) + .atPath("cloudflow.runner.streamlet") + } + private def referencedPvcsExists( cloudflowConfig: CloudflowConfig.CloudflowRoot, pvcs: () => Try[List[String]]): Try[Unit] = { @@ -297,6 +348,7 @@ trait WithConfiguration { val SecretDataKey = "secret.conf" val RuntimeConfigDataKey = "runtime-config.conf" val PodsConfigDataKey = "pods-config.conf" + val ApplicationDataKey = "application.conf" def streamletsConfigs( appCr: App.Cr, @@ -326,16 +378,19 @@ trait WithConfiguration { appCr.spec.deployments.map { deployment => val streamletConfig = CloudflowConfig.streamletConfig(deployment.streamletName, deployment.runtime, appConfig) val streamletWithPortMappingsConfig = portMappings(deployment, appConfig, streamletConfig, clustersConfig) + val applicationConfig = applicationRunnerConfig(appCr.name, appCr.spec.appVersion, deployment) deployment -> StreamletConfigs( streamlet = streamletWithPortMappingsConfig, runtime = CloudflowConfig.runtimeConfig(deployment.streamletName, deployment.runtime, appConfig), - pods = CloudflowConfig.podsConfig(deployment.streamletName, deployment.runtime, appConfig)) + pods = CloudflowConfig.podsConfig(deployment.streamletName, deployment.runtime, appConfig), + application = applicationConfig) }.toMap } } yield { res.map { case (k, v) => k -> Map( + ApplicationDataKey -> render(v.application), SecretDataKey -> render(v.streamlet), RuntimeConfigDataKey -> render(v.runtime), PodsConfigDataKey -> render(v.pods)) diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala index 57459318d..7e7e9c5d5 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala @@ -245,8 +245,7 @@ trait Runner[T <: HasMetadata] { .withBlockOwnerDeletion(true) .build() - val configData = - Vector(RunnerConfig(app.spec.appId, app.spec.appVersion, Runner.toBlueprint(deployment)), prometheusConfig) + val configData = Vector(prometheusConfig) val name = Name.ofConfigMap(deployment.name) new ConfigMapBuilder() From c2724ff496ab8f8186260ae24cfe5cb843ecacc2 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Mon, 5 Apr 2021 17:33:57 +0100 Subject: [PATCH 02/17] remove failing tests --- .../deployment/RunnerConfigSpec.scala | 103 ------------------ .../action/runner/RunnerActionsSpec.scala | 7 -- 2 files changed, 110 deletions(-) delete mode 100644 tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/RunnerConfigSpec.scala diff --git a/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/RunnerConfigSpec.scala b/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/RunnerConfigSpec.scala deleted file mode 100644 index 86b6c1529..000000000 --- a/tools/cloudflow-blueprint/src/test/scala/cloudflow/blueprint/deployment/RunnerConfigSpec.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (C) 2016-2021 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.blueprint.deployment - -import collection.JavaConverters._ - -import com.typesafe.config.ConfigFactory -import org.scalatest._ -import org.scalatest.wordspec._ -import org.scalatest.matchers.must._ - -class RunnerConfigSpec extends AnyWordSpec with Matchers with OptionValues with EitherValues with Inspectors { - - "a RunnerConfig" should { - "generate the correct JSON (one streamlet per deployment)" in { - val runnerConfig = RunnerConfig(appId, appVersion, ingressDeployment) - val config = ConfigFactory.parseString(runnerConfig.data) - - val streamlet = config.getConfig("cloudflow.runner.streamlet") - - streamlet.getString("class_name") mustBe ingressDeployment.className - streamlet.getString("streamlet_ref") mustBe ingressDeployment.streamletName - - val streamletContext = streamlet.getConfig("context") - - streamletContext.getString("app_id") mustBe appId - streamletContext.getString("app_version") mustBe appVersion - - val portMappingConfig = streamletContext.getConfig("port_mappings") - val ports = portMappingConfig - .root() - .entrySet() - .asScala - .map(_.getKey) - .toVector - - ports must have size 1 - forExactly(1, ports) { port => - val topicConfig = portMappingConfig.getConfig(port) - - ingressDeployment.portMappings must contain( - ( - port, - Topic( - topicConfig.getString("id"), - if (topicConfig.hasPath("cluster")) - Option(topicConfig.getString("cluster")) - else None, - topicConfig.getConfig("config")))) - } - - streamletContext.getConfig(s"config") mustBe ingressDeployment.config - } - } - - import cloudflow.blueprint._ - import BlueprintBuilder._ - - case class Foo(name: String) - case class Bar(name: String) - - val appId = "monstrous-mite-12345" - val appVersion = "42-abcdef0" - val image = "image-1" - - val agentPaths = Map(ApplicationDescriptor.PrometheusAgentKey -> "/app/prometheus/prometheus.jar") - val kafkaBootstrapServers = - "kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092" - - val ingress = randomStreamlet().asIngress[Foo].withServerAttribute - val processor = randomStreamlet().asProcessor[Foo, Bar].withRuntime("spark") - - val ingressRef = ingress.ref("ingress") - val processorRef = processor.ref("processor") - - val blueprint = Blueprint() - .define(Vector(ingress, processor)) - .use(ingressRef) - .use(processorRef) - .connect(Topic(id = "foos"), ingressRef.out, processorRef.in) - .connect(Topic(id = "bars"), processorRef.out) - - val verifiedBlueprint = blueprint.verified.right.value - val descriptor = ApplicationDescriptor(appId, appVersion, image, verifiedBlueprint, agentPaths, BuildInfo.version) - - val allDeployments = descriptor.deployments - val ingressDeployment = allDeployments.find(_.streamletName == ingressRef.name).value - val processorDeployment = allDeployments.find(_.streamletName == processorRef.name).value -} diff --git a/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/runner/RunnerActionsSpec.scala b/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/runner/RunnerActionsSpec.scala index 8f998d6e3..701ee737c 100644 --- a/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/runner/RunnerActionsSpec.scala +++ b/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/runner/RunnerActionsSpec.scala @@ -249,13 +249,6 @@ class RunnerActionsSpec appId: String, appVersion: String, ctx: DeploymentContext) = { - val deployment = - app.deployments.find(deployment => Name.ofConfigMap(deployment.name) == configMap.getMetadata.getName).value - (configMap.getData.asScala must contain).key(RunnerConfig.AppConfigFilename) - val mountedAppConfiguration = ConfigFactory.parseString(configMap.getData.get(RunnerConfig.AppConfigFilename)) - val expectedAppConfiguration = - ConfigFactory.parseString(RunnerConfig(appId, appVersion, Runner.toBlueprint(deployment)).data) - mountedAppConfiguration mustEqual expectedAppConfiguration (configMap.getData.asScala must contain).key(PrometheusConfig.PrometheusConfigFilename) val mountedPromConfiguration = configMap.getData.get(PrometheusConfig.PrometheusConfigFilename) val expectedPromConfiguration = PrometheusConfig(ctx.akkaRunnerDefaults.prometheusRules).data From 094622fa4efd236c53684793e2d4b418a48d141d Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Tue, 6 Apr 2021 18:07:30 +0100 Subject: [PATCH 03/17] runner-config --- core/build.sbt | 15 ++- .../cloudflow/localrunner/LocalRunner.scala | 64 +++++------ tools/build.sbt | 12 +- .../execution/WithConfiguration.scala | 69 ++++-------- .../main/scala/cloudflow/runner/config.scala | 104 ++++++++++++++++++ .../scala/cloudflow/runner/ConfigSpec.scala | 76 +++++++++++++ tools/project/Dependencies.scala | 7 ++ 7 files changed, 259 insertions(+), 88 deletions(-) create mode 100644 tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala create mode 100644 tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala diff --git a/core/build.sbt b/core/build.sbt index b7521e97e..45121c468 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -376,6 +376,19 @@ lazy val blueprint = buildInfoPackage := "cloudflow.blueprint" ) +lazy val runnerConfig = + cloudflowModule("cloudflow-runner-config") + .enablePlugins(BuildInfoPlugin, ScalafmtPlugin) + .settings( + scalafmtOnCompile := false, + Compile / scalafmtCheck := true, + libraryDependencies ++= Vector(JacksonScalaModule) + ) + .settings( + Compile / unmanagedSourceDirectories += (ThisProject / baseDirectory).value / ".." / ".." / "tools" / "cloudflow-runner-config" / "src" / "main" / "scala", + crossScalaVersions := Vector(Version.Scala212, Version.Scala213) + ) + lazy val runner = cloudflowModule("cloudflow-runner") .enablePlugins(BuildInfoPlugin, ScalafmtPlugin) @@ -412,7 +425,7 @@ lazy val runner = lazy val localRunner = cloudflowModule("cloudflow-localrunner") .enablePlugins(BuildInfoPlugin, ScalafmtPlugin) - .dependsOn(streamlets, blueprint) + .dependsOn(streamlets, blueprint, runnerConfig) .settings( crossScalaVersions := Vector(Version.Scala212, Version.Scala213), scalafmtOnCompile := true diff --git a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala index ad38800ff..3183e50ce 100644 --- a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala +++ b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala @@ -19,7 +19,6 @@ package cloudflow.localrunner import java.io.{ Closeable, File, FileOutputStream, OutputStream, PrintStream } import java.lang.{ Runtime => JRuntime } import java.nio.file._ - import scala.concurrent.{ Await, Future } import scala.concurrent.duration.Duration import scala.util.{ Failure, Success, Try } @@ -29,9 +28,11 @@ import org.slf4j.LoggerFactory import spray.json._ import cloudflow.blueprint.deployment.{ ApplicationDescriptor, StreamletDeployment, StreamletInstance, Topic } import cloudflow.blueprint.deployment.ApplicationDescriptorJsonFormat._ -import cloudflow.blueprint.VolumeMountDescriptor import cloudflow.blueprint.RunnerConfigUtils._ import cloudflow.streamlets.{ BooleanValidationType, DoubleValidationType, IntegerValidationType, StreamletExecution, StreamletLoader } +import cloudflow.runner +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.config._ import scala.concurrent.ExecutionContext.Implicits.global @@ -98,44 +99,31 @@ object LocalRunner extends StreamletLoader { } } + private val mapper = new ObjectMapper().registerModule(new DefaultScalaModule()) private def getRunnerConfig(appId: String, appVersion: String, deployment: StreamletDeployment): String = { - implicit val topicFormat = jsonFormat(Topic.apply, "id", "cluster", "config") - - def toRunnerJson(appId: String, appVersion: String, deployment: StreamletDeployment) = - JsObject( - "streamlet" -> JsObject( - "class_name" -> JsString(deployment.className), - "streamlet_ref" -> JsString(deployment.streamletName), - "context" -> JsObject( - "app_id" -> appId.toJson, - "app_version" -> appVersion.toJson, - "config" -> toJson(deployment.config), - "volume_mounts" -> toVolumeMountJson(deployment.volumeMounts), - "port_mappings" -> toPortMappingsJson(deployment.portMappings) - ) - ) - ) - - def toJson(config: Config) = config.root().render(ConfigRenderOptions.concise()).parseJson - - def toPortMappingsJson(portMappings: Map[String, Topic]) = - JsObject(portMappings.map { - case (portName, topic) => portName -> topic.toJson - }) - - def toVolumeMountJson(volumeMounts: Option[List[VolumeMountDescriptor]]) = - JsArray( - volumeMounts - .getOrElse(Vector()) - .map { - case VolumeMountDescriptor(name, path, accessMode, _) => - JsObject("name" -> JsString(name), "path" -> JsString(path), "access_mode" -> JsString(accessMode)) - } - .toVector + def toJsonNode(config: Config) = + mapper.readTree(config.root().render(ConfigRenderOptions.concise().setJson(true).setOriginComments(false).setComments(false))) + + val streamletConfig = runner.config.Streamlet( + className = deployment.className, + streamletRef = deployment.streamletName, + context = runner.config.StreamletContext( + appId = appId, + appVersion = appVersion, + config = toJsonNode(deployment.config), + volumeMounts = deployment.volumeMounts.getOrElse(List.empty).map { vm => + runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) + }, + portMappings = deployment.portMappings.map { + case (name, topic) => + name -> runner.config.Topic(id = topic.id, + // TODO: check with Ray the default + cluster = topic.cluster.getOrElse(""), + config = toJsonNode(topic.config)) + } ) - - val map = Map("runner" -> toRunnerJson(appId, appVersion, deployment)) - JsObject("cloudflow" -> JsObject(map)).compactPrint + ) + runner.config.toJson(streamletConfig) } private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = { diff --git a/tools/build.sbt b/tools/build.sbt index 3a3b17675..b3dffd8a8 100644 --- a/tools/build.sbt +++ b/tools/build.sbt @@ -110,7 +110,7 @@ lazy val cloudflowCli = } }) .enablePlugins(BuildInfoPlugin, GraalVMNativeImagePlugin) - .dependsOn(cloudflowConfig) + .dependsOn(cloudflowConfig, cloudflowRunnerConfig) lazy val cloudflowIt = Project(id = "cloudflow-it", base = file("cloudflow-it")) @@ -270,6 +270,15 @@ lazy val cloudflowSbtPlugin = }, scriptedBufferLog := false) +lazy val cloudflowRunnerConfig = + Project(id = "cloudflow-runner-config", base = file("cloudflow-runner-config")) + .enablePlugins(BuildInfoPlugin, ScalafmtPlugin) + .settings(Dependencies.cloudflowRunnerConfig) + .settings( + scalaVersion := Dependencies.Scala213, + crossScalaVersions := Vector(Dependencies.Scala212, Dependencies.Scala213), + scalafmtOnCompile := true) + lazy val root = Project(id = "root", base = file(".")) .settings(name := "root", skip in publish := true, scalafmtOnCompile := true, crossScalaVersions := Seq()) .withId("root") @@ -285,4 +294,5 @@ lazy val root = Project(id = "root", base = file(".")) cloudflowNewItLibrary, cloudflowOperator, cloudflowSbtPlugin, + cloudflowRunnerConfig, tooling) diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index 9dbfab49c..58946f757 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -12,10 +12,7 @@ import scala.util.hashing.MurmurHash3 import akka.cli.cloudflow.{ CliException, CliLogger } import akka.cloudflow.config.{ CloudflowConfig, UnsafeCloudflowConfigLoader } import akka.datap.crd.App -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.annotation.{ JsonCreator, JsonInclude, JsonProperty } -import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import com.fasterxml.jackson.databind.{ JsonDeserializer, JsonNode } +import cloudflow.runner import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } import io.fabric8.kubernetes.client.utils.Serialization @@ -24,51 +21,27 @@ private final case class StreamletConfigs(streamlet: Config, runtime: Config, po trait WithConfiguration { val logger: CliLogger - @JsonDeserialize(using = classOf[JsonDeserializer.None]) - @JsonInclude(Include.NON_NULL) - @JsonCreator - private case class RunnerContext( - @JsonProperty("app_id") - appId: String, - @JsonProperty("app_version") - appVersion: String, - @JsonProperty("config") - config: JsonNode, - @JsonProperty("volume_mounts") - volumeMounts: JsonNode, - @JsonProperty("port_mappings") - portMappings: JsonNode) - - @JsonDeserialize(using = classOf[JsonDeserializer.None]) - @JsonInclude(Include.NON_NULL) - @JsonCreator - private case class StreamletConfig( - @JsonProperty("class_name") - className: String, - @JsonProperty("streamlet_ref") - streamletRef: String, - @JsonProperty("context") - context: RunnerContext) - + // TODO: when names are finalized run the GraalVM assisted config private def applicationRunnerConfig(appId: String, appVersion: String, deployment: App.Deployment): Config = { - ConfigFactory - .parseString( - Serialization - .jsonMapper() - .writeValueAsString(StreamletConfig( - className = deployment.className, - streamletRef = deployment.streamletName, - context = RunnerContext( - appId = appId, - appVersion = appVersion, - config = deployment.config, - volumeMounts = Serialization - .jsonMapper() - .readTree(Serialization.jsonMapper().writeValueAsString(deployment.volumeMounts)), - portMappings = Serialization - .jsonMapper() - .readTree(Serialization.jsonMapper().writeValueAsString(deployment.portMappings)))))) - .atPath("cloudflow.runner.streamlet") + val configStreamlet = + runner.config.Streamlet( + streamletRef = deployment.streamletName, + className = deployment.className, + context = runner.config.StreamletContext( + appId = appId, + appVersion = appVersion, + config = deployment.config, + volumeMounts = Option(deployment.volumeMounts).getOrElse(Seq.empty).map { vm => + // TODO: check with Ray: name = vm.appId + runner.config.VolumeMount(name = vm.appId, path = vm.path, accessMode = vm.accessMode) + }, + portMappings = Option(deployment.portMappings).getOrElse(Map.empty).map { + case (name, pm) => + // TODO: check with Ray: cluster should be "default"? + name -> runner.config.Topic(id = pm.id, cluster = pm.cluster.getOrElse(""), config = pm.config) + })) + + ConfigFactory.parseString(runner.config.toJson(configStreamlet)) } private def referencedPvcsExists( diff --git a/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala new file mode 100644 index 000000000..ff6b0bcf5 --- /dev/null +++ b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2016-2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloudflow.runner + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{ JsonCreator, JsonInclude, JsonProperty } +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import scala.collection.immutable +import com.fasterxml.jackson.databind.{ JsonDeserializer, JsonNode, ObjectMapper } +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object config { + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class Topic( + @JsonProperty("id") + id: String, + @JsonProperty("cluster") + cluster: String, + @JsonProperty("config") + config: JsonNode) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class VolumeMount( + @JsonProperty("name") + name: String, + @JsonProperty("path") + path: String, + @JsonProperty("access_mode") + accessMode: String) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class StreamletContext( + @JsonProperty("app_id") + appId: String, + @JsonProperty("app_version") + appVersion: String, + @JsonProperty("config") + config: JsonNode, + @JsonProperty("volume_mounts") + volumeMounts: immutable.Seq[VolumeMount] = immutable.Seq.empty, + @JsonProperty("port_mappings") + portMappings: Map[String, Topic] = Map.empty) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class Streamlet( + @JsonProperty("class_name") + className: String, + @JsonProperty("streamlet_ref") + streamletRef: String, + @JsonProperty("context") + context: StreamletContext) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class Runner( + @JsonProperty("streamlet") + streamlet: Streamlet) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class Cloudflow( + @JsonProperty("runner") + runner: Runner) + + @JsonDeserialize(using = classOf[JsonDeserializer.None]) + @JsonInclude(Include.NON_NULL) + @JsonCreator + case class CloudflowRoot( + @JsonProperty("cloudflow") + cloudflow: Cloudflow) + + private val mapper = new ObjectMapper().registerModule(new DefaultScalaModule()) + + def toJson(streamlet: Streamlet): String = + mapper.writeValueAsString(CloudflowRoot(cloudflow = Cloudflow(runner = Runner(streamlet = streamlet)))) + def fromJson(s: String): Streamlet = mapper.readValue(s, classOf[CloudflowRoot]).cloudflow.runner.streamlet + +} diff --git a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala new file mode 100644 index 000000000..c9419c0a8 --- /dev/null +++ b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala @@ -0,0 +1,76 @@ +package cloudflow.runner + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.typesafe.config.{ ConfigFactory, ConfigRenderOptions } +import org.scalatest.wordspec._ +import org.scalatest.matchers.must._ + +import scala.collection.immutable +import scala.collection.JavaConverters._ + +class ConfigSpec extends AnyWordSpec with Matchers { + + private val mapper = new ObjectMapper().registerModule(new DefaultScalaModule()) + private def asJson(s: String) = + mapper.readTree(ConfigFactory.parseString(s).root().render(ConfigRenderOptions.concise().setJson(true))) + + "a runner.config" should { + "generate the correct JSON" in { + // Arrange + val streamlet = config.Streamlet( + streamletRef = "reference", + className = "clazz", + context = config.StreamletContext( + appId = "application", + appVersion = "123", + config = asJson("{ foo: bar }"), + volumeMounts = immutable.Seq(config.VolumeMount(name = "foo1", path = "bar", accessMode = "readOnly")), + portMappings = + Map("port0" -> config.Topic(id = "id0", cluster = "cluster0", config = asJson("{ bar: baz }"))))) + + // Act + val str = config.toJson(streamlet) + + // Assert + val runnerConfig = ConfigFactory.parseString(str).getConfig("cloudflow.runner.streamlet") + runnerConfig.getString("streamlet_ref") mustBe "reference" + runnerConfig.getString("class_name") mustBe "clazz" + runnerConfig.getString("context.app_id") mustBe "application" + runnerConfig.getString("context.app_version") mustBe "123" + runnerConfig.getConfig("context.config").getString("foo") mustBe "bar" + val volumeMounts = runnerConfig.getConfigList("context.volume_mounts").asScala + volumeMounts.size mustBe 1 + volumeMounts.head.getString("name") mustBe "foo1" + volumeMounts.head.getString("path") mustBe "bar" + volumeMounts.head.getString("access_mode") mustBe "readOnly" + val port0Mapping = runnerConfig.getConfig("context.port_mappings").getConfig("port0") + port0Mapping.getString("id") mustBe "id0" + port0Mapping.getString("cluster") mustBe "cluster0" + port0Mapping.getConfig("config").getString("bar") mustBe "baz" + } + + "serialize and deserialize should be idempotent" in { + // Arrange + val streamlet = config.Streamlet( + streamletRef = "reference", + className = "clazz", + context = config.StreamletContext( + appId = "application", + appVersion = "123", + config = asJson("{ foo: bar }"), + volumeMounts = immutable.Seq(config.VolumeMount(name = "foo1", path = "bar", accessMode = "readOnly")), + portMappings = + Map("port0" -> config.Topic(id = "id0", cluster = "cluster0", config = asJson("{ bar: baz }"))))) + + // Act + val str = config.toJson(streamlet) + val deser = config.fromJson(str) + val res = config.toJson(deser) + + // Assert + str mustBe res + } + } + +} diff --git a/tools/project/Dependencies.scala b/tools/project/Dependencies.scala index 25fd20c98..be3e2318d 100644 --- a/tools/project/Dependencies.scala +++ b/tools/project/Dependencies.scala @@ -129,4 +129,11 @@ object Dependencies { Compile.testcontainersKafka, Compile.kafkaClient, Compile.scalatest % Test) + + val cloudflowRunnerConfig = + libraryDependencies ++= Seq( + Compile.jacksonScala, + Compile.typesafeConfig % Test, + Compile.scalatest % Test) + } From 57d0f3624504a690eb05fd308068f8a898e378c7 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Tue, 6 Apr 2021 18:09:04 +0100 Subject: [PATCH 04/17] missing headers --- .../src/test/scala/cloudflow/runner/ConfigSpec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala index c9419c0a8..3116bba47 100644 --- a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala +++ b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + package cloudflow.runner import com.fasterxml.jackson.databind.ObjectMapper From f1e7aa3c3107b8b7044193d839b5dfcd41cb4dde Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Tue, 6 Apr 2021 18:13:28 +0100 Subject: [PATCH 05/17] fmt --- tools/project/Dependencies.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tools/project/Dependencies.scala b/tools/project/Dependencies.scala index be3e2318d..e107e8b5c 100644 --- a/tools/project/Dependencies.scala +++ b/tools/project/Dependencies.scala @@ -131,9 +131,6 @@ object Dependencies { Compile.scalatest % Test) val cloudflowRunnerConfig = - libraryDependencies ++= Seq( - Compile.jacksonScala, - Compile.typesafeConfig % Test, - Compile.scalatest % Test) + libraryDependencies ++= Seq(Compile.jacksonScala, Compile.typesafeConfig % Test, Compile.scalatest % Test) } From 4fa812243a2e7635972cca5e473b7cc58fb90ea8 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Tue, 6 Apr 2021 18:26:06 +0100 Subject: [PATCH 06/17] aggregate runner-config --- core/build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/core/build.sbt b/core/build.sbt index 45121c468..7b7d176a5 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -98,6 +98,7 @@ lazy val root = flink, flinkTestkit, flinkTests, + runnerConfig, localRunner, runner, blueprint From bfc34866607a75ab67be2281c7fc09963366526a Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Wed, 7 Apr 2021 12:56:53 +0100 Subject: [PATCH 07/17] fix volume mount name --- .../akka/cli/cloudflow/execution/WithConfiguration.scala | 3 +-- tools/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala | 4 ++-- .../scala/cloudflow/operator/action/runner/AkkaRunner.scala | 4 ++-- .../scala/cloudflow/operator/action/runner/FlinkRunner.scala | 4 ++-- .../main/scala/cloudflow/operator/action/runner/Runner.scala | 2 +- .../scala/cloudflow/operator/action/runner/SparkRunner.scala | 4 ++-- .../scala/cloudflow/operator/action/TestApplication.scala | 2 +- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index 58946f757..c8983e8c2 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -32,8 +32,7 @@ trait WithConfiguration { appVersion = appVersion, config = deployment.config, volumeMounts = Option(deployment.volumeMounts).getOrElse(Seq.empty).map { vm => - // TODO: check with Ray: name = vm.appId - runner.config.VolumeMount(name = vm.appId, path = vm.path, accessMode = vm.accessMode) + runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) }, portMappings = Option(deployment.portMappings).getOrElse(Map.empty).map { case (name, pm) => diff --git a/tools/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala b/tools/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala index 89a5c40f3..1de9b27c0 100644 --- a/tools/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala +++ b/tools/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala @@ -196,8 +196,8 @@ object App { @JsonDeserialize(using = classOf[JsonDeserializer.None]) @JsonCreator final case class VolumeMountDescriptor( - @JsonProperty("app_id") - appId: String, + @JsonProperty("name") + name: String, @JsonProperty("path") path: String, @JsonProperty("access_mode") diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/AkkaRunner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/AkkaRunner.scala index 612a57949..7f8b58166 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/AkkaRunner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/AkkaRunner.scala @@ -213,7 +213,7 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De val pvcRefVolumes = streamletToDeploy.map(_.descriptor.volumeMounts.map { mount => new VolumeBuilder() - .withName(mount.appId) + .withName(mount.name) .withPersistentVolumeClaim( new PersistentVolumeClaimVolumeSourceBuilder() .withClaimName(mount.pvcName.getOrElse("")) @@ -228,7 +228,7 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De } new VolumeMountBuilder() - .withName(mount.appId) + .withName(mount.name) .withMountPath(mount.path) .withReadOnly(readOnly) .build() diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/FlinkRunner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/FlinkRunner.scala index f02c495cd..9e10c7d10 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/FlinkRunner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/FlinkRunner.scala @@ -361,7 +361,7 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner // Streamlet volume mounting (Defined by Streamlet.volumeMounts API) val streamletPvcVolume = streamletToDeploy.toVector.flatMap(_.descriptor.volumeMounts.map { mount => new VolumeBuilder() - .withName(mount.appId) + .withName(mount.name) .withNewPersistentVolumeClaim() .withClaimName(mount.pvcName.getOrElse("")) .endPersistentVolumeClaim() @@ -387,7 +387,7 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner private def makeVolumeMountsSpec(streamletToDeploy: Option[App.Streamlet]): Vector[VolumeMount] = { val streamletVolumeMount = streamletToDeploy.toVector.flatMap(_.descriptor.volumeMounts.map { mount => new VolumeMountBuilder() - .withName(mount.appId) + .withName(mount.name) .withMountPath(mount.path) .build() }) diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala index 7e7e9c5d5..eaa045c0e 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala @@ -91,7 +91,7 @@ object Runner { else Some(deployment.volumeMounts.map { vmd => VolumeMountDescriptor( - name = vmd.appId, + name = vmd.name, path = vmd.path, accessMode = vmd.accessMode, pvcName = vmd.pvcName.getOrElse("")) diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/SparkRunner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/SparkRunner.scala index 02031e4b0..17dce0966 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/SparkRunner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/SparkRunner.scala @@ -200,7 +200,7 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner // Streamlet volume mounting (Defined by Streamlet.volumeMounts API) val streamletPvcVolume = streamletToDeploy.toSeq.flatMap(_.descriptor.volumeMounts.map { mount => new VolumeBuilder() - .withName(mount.appId) + .withName(mount.name) .withNewPersistentVolumeClaim() .withClaimName(mount.pvcName.getOrElse("")) .endPersistentVolumeClaim() @@ -208,7 +208,7 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner }) val streamletVolumeMount = streamletToDeploy.toSeq.flatMap(_.descriptor.volumeMounts.map { mount => new VolumeMountBuilder() - .withName(mount.appId) + .withName(mount.name) .withMountPath(mount.path) .build() }) diff --git a/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/TestApplication.scala b/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/TestApplication.scala index 13b76fc04..f64a0313f 100644 --- a/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/TestApplication.scala +++ b/tools/cloudflow-operator/src/test/scala/cloudflow/operator/action/TestApplication.scala @@ -74,7 +74,7 @@ object CloudflowApplicationSpecBuilder { } private def toVolumeMount(vmd: VolumeMountDescriptor) = { - App.VolumeMountDescriptor(appId = vmd.name, path = vmd.path, accessMode = vmd.accessMode, pvcName = { + App.VolumeMountDescriptor(name = vmd.name, path = vmd.path, accessMode = vmd.accessMode, pvcName = { if (vmd.pvcName == null || vmd.pvcName.isEmpty) { Some(vmd.pvcName) } else None From 3cb6adcb758a5ef2ad81ee43854c4f6699dff896 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 18:48:52 +0100 Subject: [PATCH 08/17] no nitpicks --- .../cli/cloudflow/execution/WithConfiguration.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index c8983e8c2..76741034a 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -12,7 +12,6 @@ import scala.util.hashing.MurmurHash3 import akka.cli.cloudflow.{ CliException, CliLogger } import akka.cloudflow.config.{ CloudflowConfig, UnsafeCloudflowConfigLoader } import akka.datap.crd.App -import cloudflow.runner import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } import io.fabric8.kubernetes.client.utils.Serialization @@ -24,23 +23,23 @@ trait WithConfiguration { // TODO: when names are finalized run the GraalVM assisted config private def applicationRunnerConfig(appId: String, appVersion: String, deployment: App.Deployment): Config = { val configStreamlet = - runner.config.Streamlet( + cloudflow.runner.config.Streamlet( streamletRef = deployment.streamletName, className = deployment.className, - context = runner.config.StreamletContext( + context = cloudflow.runner.config.StreamletContext( appId = appId, appVersion = appVersion, config = deployment.config, volumeMounts = Option(deployment.volumeMounts).getOrElse(Seq.empty).map { vm => - runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) + cloudflow.runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) }, portMappings = Option(deployment.portMappings).getOrElse(Map.empty).map { case (name, pm) => // TODO: check with Ray: cluster should be "default"? - name -> runner.config.Topic(id = pm.id, cluster = pm.cluster.getOrElse(""), config = pm.config) + name -> cloudflow.runner.config.Topic(id = pm.id, cluster = pm.cluster.getOrElse(""), config = pm.config) })) - ConfigFactory.parseString(runner.config.toJson(configStreamlet)) + ConfigFactory.parseString(cloudflow.runner.config.toJson(configStreamlet)) } private def referencedPvcsExists( From e175602159e5287e643089d8b75c0b27536d7d61 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 18:51:01 +0100 Subject: [PATCH 09/17] no nitpicks part 2 --- .../cloudflow/localrunner/LocalRunner.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala index 3183e50ce..ee2e0a16b 100644 --- a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala +++ b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala @@ -30,7 +30,6 @@ import cloudflow.blueprint.deployment.{ ApplicationDescriptor, StreamletDeployme import cloudflow.blueprint.deployment.ApplicationDescriptorJsonFormat._ import cloudflow.blueprint.RunnerConfigUtils._ import cloudflow.streamlets.{ BooleanValidationType, DoubleValidationType, IntegerValidationType, StreamletExecution, StreamletLoader } -import cloudflow.runner import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.config._ @@ -104,26 +103,26 @@ object LocalRunner extends StreamletLoader { def toJsonNode(config: Config) = mapper.readTree(config.root().render(ConfigRenderOptions.concise().setJson(true).setOriginComments(false).setComments(false))) - val streamletConfig = runner.config.Streamlet( + val streamletConfig = cloudflow.runner.config.Streamlet( className = deployment.className, streamletRef = deployment.streamletName, - context = runner.config.StreamletContext( + context = cloudflow.runner.config.StreamletContext( appId = appId, appVersion = appVersion, config = toJsonNode(deployment.config), volumeMounts = deployment.volumeMounts.getOrElse(List.empty).map { vm => - runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) + cloudflow.runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode) }, portMappings = deployment.portMappings.map { case (name, topic) => - name -> runner.config.Topic(id = topic.id, - // TODO: check with Ray the default - cluster = topic.cluster.getOrElse(""), - config = toJsonNode(topic.config)) + name -> cloudflow.runner.config.Topic(id = topic.id, + // TODO: check with Ray the default + cluster = topic.cluster.getOrElse(""), + config = toJsonNode(topic.config)) } ) ) - runner.config.toJson(streamletConfig) + cloudflow.runner.config.toJson(streamletConfig) } private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = { From 70994bcbf13ed316d342b3d0ed42f5041dcbd5fd Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 18:54:21 +0100 Subject: [PATCH 10/17] backward compat note --- .../cloudflow/runner/RunnerConfigResolver.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala b/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala index 1142f9961..28d4d2fa5 100644 --- a/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala +++ b/core/cloudflow-runner/src/main/scala/cloudflow/runner/RunnerConfigResolver.scala @@ -27,16 +27,20 @@ trait RunnerConfigResolver { final val ConfigSecretMountPath = "/etc/cloudflow-runner-secret" final val SecretConfigFile = "secret.conf" + def backwardsCompatConfig(configPath: Path): Path = + if (Files.exists(configPath)) { + configPath + } else { + // Backward compatibility: Use the ConfigMap populated by the operator + Paths.get(s"$ConfigMountPath/$ConfigFile") + } + def makeConfig: Try[Config] = Try { val configFilePathString = Option(System.getProperty("config.file")).getOrElse(s"$ConfigSecretMountPath/$ConfigFile") val configPath = Paths.get(configFilePathString) val secretPath = Paths.get(s"$ConfigSecretMountPath/$SecretConfigFile") - val applicationConfig = if (Files.exists(configPath)) { - configPath - } else { - Paths.get(s"$ConfigMountPath/$ConfigFile") - } + val applicationConfig = backwardsCompatConfig(configPath) val config = if (Files.exists(secretPath)) { println(s"Loading application.conf from: $applicationConfig, secret config from: $secretPath") From b66b3d4a6747e348d499e19b7e2e3a82240ad4c2 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 19:00:06 +0100 Subject: [PATCH 11/17] cluster in topic is optional --- .../src/main/scala/cloudflow/localrunner/LocalRunner.scala | 5 +---- .../akka/cli/cloudflow/execution/WithConfiguration.scala | 2 +- .../src/main/scala/cloudflow/runner/config.scala | 2 +- .../src/test/scala/cloudflow/runner/ConfigSpec.scala | 4 ++-- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala index ee2e0a16b..43074af5b 100644 --- a/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala +++ b/core/cloudflow-localrunner/src/main/scala/cloudflow/localrunner/LocalRunner.scala @@ -115,10 +115,7 @@ object LocalRunner extends StreamletLoader { }, portMappings = deployment.portMappings.map { case (name, topic) => - name -> cloudflow.runner.config.Topic(id = topic.id, - // TODO: check with Ray the default - cluster = topic.cluster.getOrElse(""), - config = toJsonNode(topic.config)) + name -> cloudflow.runner.config.Topic(id = topic.id, cluster = topic.cluster, config = toJsonNode(topic.config)) } ) ) diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index 76741034a..20df77823 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -36,7 +36,7 @@ trait WithConfiguration { portMappings = Option(deployment.portMappings).getOrElse(Map.empty).map { case (name, pm) => // TODO: check with Ray: cluster should be "default"? - name -> cloudflow.runner.config.Topic(id = pm.id, cluster = pm.cluster.getOrElse(""), config = pm.config) + name -> cloudflow.runner.config.Topic(id = pm.id, cluster = pm.cluster, config = pm.config) })) ConfigFactory.parseString(cloudflow.runner.config.toJson(configStreamlet)) diff --git a/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala index ff6b0bcf5..15a44d3c1 100644 --- a/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala +++ b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala @@ -33,7 +33,7 @@ object config { @JsonProperty("id") id: String, @JsonProperty("cluster") - cluster: String, + cluster: Option[String], @JsonProperty("config") config: JsonNode) diff --git a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala index 3116bba47..f4454ce21 100644 --- a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala +++ b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala @@ -31,7 +31,7 @@ class ConfigSpec extends AnyWordSpec with Matchers { config = asJson("{ foo: bar }"), volumeMounts = immutable.Seq(config.VolumeMount(name = "foo1", path = "bar", accessMode = "readOnly")), portMappings = - Map("port0" -> config.Topic(id = "id0", cluster = "cluster0", config = asJson("{ bar: baz }"))))) + Map("port0" -> config.Topic(id = "id0", cluster = Some("cluster0"), config = asJson("{ bar: baz }"))))) // Act val str = config.toJson(streamlet) @@ -65,7 +65,7 @@ class ConfigSpec extends AnyWordSpec with Matchers { config = asJson("{ foo: bar }"), volumeMounts = immutable.Seq(config.VolumeMount(name = "foo1", path = "bar", accessMode = "readOnly")), portMappings = - Map("port0" -> config.Topic(id = "id0", cluster = "cluster0", config = asJson("{ bar: baz }"))))) + Map("port0" -> config.Topic(id = "id0", cluster = Some("cluster0"), config = asJson("{ bar: baz }"))))) // Act val str = config.toJson(streamlet) From 580f11c1c49ce6464ff61f67a561794e865b8e55 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 19:01:24 +0100 Subject: [PATCH 12/17] default to None --- .../src/main/scala/cloudflow/runner/config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala index 15a44d3c1..f9015f289 100644 --- a/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala +++ b/tools/cloudflow-runner-config/src/main/scala/cloudflow/runner/config.scala @@ -33,7 +33,7 @@ object config { @JsonProperty("id") id: String, @JsonProperty("cluster") - cluster: Option[String], + cluster: Option[String] = None, @JsonProperty("config") config: JsonNode) From 61348cc0406fb0493a9a933e71690c401a6150d3 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 19:02:13 +0100 Subject: [PATCH 13/17] final --- .../scala/akka/cli/cloudflow/execution/WithConfiguration.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala index 20df77823..3cceeef69 100644 --- a/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala +++ b/tools/cloudflow-cli/src/main/scala/akka/cli/cloudflow/execution/WithConfiguration.scala @@ -35,7 +35,6 @@ trait WithConfiguration { }, portMappings = Option(deployment.portMappings).getOrElse(Map.empty).map { case (name, pm) => - // TODO: check with Ray: cluster should be "default"? name -> cloudflow.runner.config.Topic(id = pm.id, cluster = pm.cluster, config = pm.config) })) From 296f5a9fc97dfeb099dc2da160e16c978751530c Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Thu, 8 Apr 2021 19:12:49 +0100 Subject: [PATCH 14/17] train graalvm again --- .../META-INF/native-image/reflect-config.json | 72 +++++++++++++++++++ .../native-image/resource-config.json | 6 ++ 2 files changed, 78 insertions(+) diff --git a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json index 8c9d36336..1a324a974 100644 --- a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json +++ b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json @@ -356,6 +356,78 @@ "name":"ch.qos.logback.core.status.NopStatusListener", "methods":[{"name":"","parameterTypes":[] }] }, +{ + "name":"cloudflow.runner.config$Cloudflow", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$Cloudflow$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, +{ + "name":"cloudflow.runner.config$CloudflowRoot", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$CloudflowRoot$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, +{ + "name":"cloudflow.runner.config$Runner", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$Runner$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, +{ + "name":"cloudflow.runner.config$Streamlet", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$Streamlet$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, +{ + "name":"cloudflow.runner.config$StreamletContext", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$StreamletContext$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, +{ + "name":"cloudflow.runner.config$Topic", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$Topic$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, { "name":"com.fasterxml.jackson.annotation.JsonCreator[]" }, diff --git a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json index 04da9fc08..09d59f3ef 100644 --- a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json +++ b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json @@ -29,6 +29,12 @@ {"pattern":"\\Qakka/datap/crd/App$Streamlet.class\\E"}, {"pattern":"\\Qakka/datap/crd/App$StreamletStatus.class\\E"}, {"pattern":"\\Qakka/datap/crd/App$VolumeMountDescriptor.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$Cloudflow.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$CloudflowRoot.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$Runner.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$Streamlet.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$StreamletContext.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$Topic.class\\E"}, {"pattern":"\\Qcom/fasterxml/jackson/module/scala/build.properties\\E"}, {"pattern":"\\Qio/fabric8/kubernetes/api/model/ConfigMap.class\\E"}, {"pattern":"\\Qio/fabric8/kubernetes/api/model/ConfigMapList.class\\E"}, From 0311f3cf8e9ef7a3d4adf745daf60d294c210669 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Fri, 9 Apr 2021 08:46:15 +0100 Subject: [PATCH 15/17] minor --- .../src/test/scala/cloudflow/runner/ConfigSpec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala index f4454ce21..999b2aaa2 100644 --- a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala +++ b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala @@ -34,10 +34,10 @@ class ConfigSpec extends AnyWordSpec with Matchers { Map("port0" -> config.Topic(id = "id0", cluster = Some("cluster0"), config = asJson("{ bar: baz }"))))) // Act - val str = config.toJson(streamlet) + val jsonStr = config.toJson(streamlet) // Assert - val runnerConfig = ConfigFactory.parseString(str).getConfig("cloudflow.runner.streamlet") + val runnerConfig = ConfigFactory.parseString(jsonStr).getConfig("cloudflow.runner.streamlet") runnerConfig.getString("streamlet_ref") mustBe "reference" runnerConfig.getString("class_name") mustBe "clazz" runnerConfig.getString("context.app_id") mustBe "application" @@ -68,12 +68,12 @@ class ConfigSpec extends AnyWordSpec with Matchers { Map("port0" -> config.Topic(id = "id0", cluster = Some("cluster0"), config = asJson("{ bar: baz }"))))) // Act - val str = config.toJson(streamlet) - val deser = config.fromJson(str) + val jsonStr = config.toJson(streamlet) + val deser = config.fromJson(jsonStr) val res = config.toJson(deser) // Assert - str mustBe res + jsonStr mustBe res } } From eb349c91b71a5e95afb672d60e1d9d325812a97a Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Fri, 9 Apr 2021 08:57:11 +0100 Subject: [PATCH 16/17] renaming val to avoid compiler go nuts --- .../src/test/scala/cloudflow/runner/ConfigSpec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala index 999b2aaa2..84ed051e1 100644 --- a/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala +++ b/tools/cloudflow-runner-config/src/test/scala/cloudflow/runner/ConfigSpec.scala @@ -43,11 +43,11 @@ class ConfigSpec extends AnyWordSpec with Matchers { runnerConfig.getString("context.app_id") mustBe "application" runnerConfig.getString("context.app_version") mustBe "123" runnerConfig.getConfig("context.config").getString("foo") mustBe "bar" - val volumeMounts = runnerConfig.getConfigList("context.volume_mounts").asScala - volumeMounts.size mustBe 1 - volumeMounts.head.getString("name") mustBe "foo1" - volumeMounts.head.getString("path") mustBe "bar" - volumeMounts.head.getString("access_mode") mustBe "readOnly" + val vms = runnerConfig.getConfigList("context.volume_mounts").asScala + vms.size mustBe 1 + vms.head.getString("name") mustBe "foo1" + vms.head.getString("path") mustBe "bar" + vms.head.getString("access_mode") mustBe "readOnly" val port0Mapping = runnerConfig.getConfig("context.port_mappings").getConfig("port0") port0Mapping.getString("id") mustBe "id0" port0Mapping.getString("cluster") mustBe "cluster0" From b77ae8a2ea4e310c1d85de463bca232dbc291545 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Fri, 9 Apr 2021 09:06:30 +0100 Subject: [PATCH 17/17] graalvm volume mount --- .../META-INF/native-image/reflect-config.json | 12 ++++++++++ .../native-image/resource-config.json | 1 + .../main/scala/cli/CodepathCoverageMain.scala | 22 +++++++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json index 1a324a974..395c7c7ad 100644 --- a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json +++ b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/reflect-config.json @@ -428,6 +428,18 @@ "allPublicMethods":true, "fields":[{"name":"MODULE$"}] }, +{ + "name":"cloudflow.runner.config$VolumeMount", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true, + "allPublicConstructors":true +}, +{ + "name":"cloudflow.runner.config$VolumeMount$", + "allPublicMethods":true, + "fields":[{"name":"MODULE$"}] +}, { "name":"com.fasterxml.jackson.annotation.JsonCreator[]" }, diff --git a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json index 09d59f3ef..6de2c5293 100644 --- a/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json +++ b/tools/cloudflow-cli/src/main/resources/META-INF/native-image/resource-config.json @@ -35,6 +35,7 @@ {"pattern":"\\Qcloudflow/runner/config$Streamlet.class\\E"}, {"pattern":"\\Qcloudflow/runner/config$StreamletContext.class\\E"}, {"pattern":"\\Qcloudflow/runner/config$Topic.class\\E"}, + {"pattern":"\\Qcloudflow/runner/config$VolumeMount.class\\E"}, {"pattern":"\\Qcom/fasterxml/jackson/module/scala/build.properties\\E"}, {"pattern":"\\Qio/fabric8/kubernetes/api/model/ConfigMap.class\\E"}, {"pattern":"\\Qio/fabric8/kubernetes/api/model/ConfigMapList.class\\E"}, diff --git a/tools/tooling/src/main/scala/cli/CodepathCoverageMain.scala b/tools/tooling/src/main/scala/cli/CodepathCoverageMain.scala index ca3f5e87d..4003c63e4 100644 --- a/tools/tooling/src/main/scala/cli/CodepathCoverageMain.scala +++ b/tools/tooling/src/main/scala/cli/CodepathCoverageMain.scala @@ -143,5 +143,27 @@ object CodepathCoverageMain extends App { .jsonMapper() .readValue("{}", classOf[io.fabric8.kubernetes.api.model.EndpointsList]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.Topic]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.VolumeMount]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.StreamletContext]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.Streamlet]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.Runner]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.Cloudflow]) + Serialization + .jsonMapper() + .readValue("{}", classOf[cloudflow.runner.config.CloudflowRoot]) + System.exit(0) }