Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runner config from the CLI #1014

Merged
merged 17 commits into from
Apr 9, 2021
16 changes: 15 additions & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ lazy val root =
flink,
flinkTestkit,
flinkTests,
runnerConfig,
localRunner,
runner,
blueprint
Expand Down Expand Up @@ -376,6 +377,19 @@ lazy val blueprint =
buildInfoPackage := "cloudflow.blueprint"
)

lazy val runnerConfig =
cloudflowModule("cloudflow-runner-config")
.enablePlugins(BuildInfoPlugin, ScalafmtPlugin)
.settings(
scalafmtOnCompile := false,
Compile / scalafmtCheck := true,
libraryDependencies ++= Vector(JacksonScalaModule)
)
.settings(
Compile / unmanagedSourceDirectories += (ThisProject / baseDirectory).value / ".." / ".." / "tools" / "cloudflow-runner-config" / "src" / "main" / "scala",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's chat about tools / core again ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

back to the core !

crossScalaVersions := Vector(Version.Scala212, Version.Scala213)
)

lazy val runner =
cloudflowModule("cloudflow-runner")
.enablePlugins(BuildInfoPlugin, ScalafmtPlugin)
Expand Down Expand Up @@ -412,7 +426,7 @@ lazy val runner =
lazy val localRunner =
cloudflowModule("cloudflow-localrunner")
.enablePlugins(BuildInfoPlugin, ScalafmtPlugin)
.dependsOn(streamlets, blueprint)
.dependsOn(streamlets, blueprint, runnerConfig)
.settings(
crossScalaVersions := Vector(Version.Scala212, Version.Scala213),
scalafmtOnCompile := true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ package cloudflow.localrunner
import java.io.{ Closeable, File, FileOutputStream, OutputStream, PrintStream }
import java.lang.{ Runtime => JRuntime }
import java.nio.file._

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import com.typesafe.config.{ Config, ConfigFactory }
import org.slf4j.LoggerFactory
import spray.json._
import cloudflow.blueprint.deployment.{ ApplicationDescriptor, RunnerConfig, StreamletDeployment, StreamletInstance, Topic }
import cloudflow.blueprint.deployment.{ ApplicationDescriptor, StreamletDeployment, StreamletInstance, Topic }
import cloudflow.blueprint.deployment.ApplicationDescriptorJsonFormat._
import cloudflow.blueprint.RunnerConfigUtils._
import cloudflow.streamlets.{ BooleanValidationType, DoubleValidationType, IntegerValidationType, StreamletExecution, StreamletLoader }
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.config._

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -97,6 +98,30 @@ object LocalRunner extends StreamletLoader {
}
}

private val mapper = new ObjectMapper().registerModule(new DefaultScalaModule())
private def getRunnerConfig(appId: String, appVersion: String, deployment: StreamletDeployment): String = {
def toJsonNode(config: Config) =
mapper.readTree(config.root().render(ConfigRenderOptions.concise().setJson(true).setOriginComments(false).setComments(false)))

val streamletConfig = cloudflow.runner.config.Streamlet(
className = deployment.className,
streamletRef = deployment.streamletName,
context = cloudflow.runner.config.StreamletContext(
appId = appId,
appVersion = appVersion,
config = toJsonNode(deployment.config),
volumeMounts = deployment.volumeMounts.getOrElse(List.empty).map { vm =>
cloudflow.runner.config.VolumeMount(name = vm.name, path = vm.path, accessMode = vm.accessMode)
},
portMappings = deployment.portMappings.map {
case (name, topic) =>
name -> cloudflow.runner.config.Topic(id = topic.id, cluster = topic.cluster, config = toJsonNode(topic.config))
}
)
)
cloudflow.runner.config.toJson(streamletConfig)
}

private def run(appDescriptor: ApplicationDescriptor, localConfig: Config, kafkaHost: String): Unit = {
val bootstrapServers =
if (localConfig.hasPath(BootstrapServersKey)) localConfig.getString(BootstrapServersKey) else kafkaHost
Expand Down Expand Up @@ -137,8 +162,8 @@ object LocalRunner extends StreamletLoader {
StreamletDeployment.EndpointContainerPort + endpointIdx)
deployment.endpoint.foreach(_ => endpointIdx += 1)

val runnerConfigObj = RunnerConfig(appId, appVersion, deployment)
val runnerConfig = addStorageConfig(ConfigFactory.parseString(runnerConfigObj.data), localStorageDirectory)
val runnerConfigObj = getRunnerConfig(appId, appVersion, deployment)
val runnerConfig = addStorageConfig(ConfigFactory.parseString(runnerConfigObj), localStorageDirectory)

val patchedRunnerConfig = runnerConfig
.withFallback(streamletParamConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,31 @@ trait RunnerConfigResolver {
final val ConfigSecretMountPath = "/etc/cloudflow-runner-secret"
final val SecretConfigFile = "secret.conf"

def backwardsCompatConfig(configPath: Path): Path =
if (Files.exists(configPath)) {
configPath
} else {
// Backward compatibility: Use the ConfigMap populated by the operator
Paths.get(s"$ConfigMountPath/$ConfigFile")
}

def makeConfig: Try[Config] = Try {
val configFilePathString = Option(System.getProperty("config.file")).getOrElse(s"$ConfigMountPath/$ConfigFile")
val configFilePathString = Option(System.getProperty("config.file")).getOrElse(s"$ConfigSecretMountPath/$ConfigFile")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly confusing, maybe we can chat.

val configPath = Paths.get(configFilePathString)
val secretPath = Paths.get(s"$ConfigSecretMountPath/$SecretConfigFile")

val applicationConfig = backwardsCompatConfig(configPath)

val config = if (Files.exists(secretPath)) {
println(s"Loading application.conf from: $configPath, secret config from: $secretPath")
println(s"Loading application.conf from: $applicationConfig, secret config from: $secretPath")
// secret takes precedence, since it contains config.
loadConfig(secretPath)
.withFallback(loadConfig(configPath))
.withFallback(loadConfig(applicationConfig))
.withFallback(ConfigFactory.load)
} else {
println(s"Loading application.conf from: $configPath")
println(s"Loading application.conf from: $applicationConfig")

loadConfig(configPath)
loadConfig(applicationConfig)
.withFallback(ConfigFactory.load)
}

Expand Down
12 changes: 11 additions & 1 deletion tools/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ lazy val cloudflowCli =
}
})
.enablePlugins(BuildInfoPlugin, GraalVMNativeImagePlugin)
.dependsOn(cloudflowConfig)
.dependsOn(cloudflowConfig, cloudflowRunnerConfig)

lazy val cloudflowIt =
Project(id = "cloudflow-it", base = file("cloudflow-it"))
Expand Down Expand Up @@ -270,6 +270,15 @@ lazy val cloudflowSbtPlugin =
},
scriptedBufferLog := false)

lazy val cloudflowRunnerConfig =
Project(id = "cloudflow-runner-config", base = file("cloudflow-runner-config"))
.enablePlugins(BuildInfoPlugin, ScalafmtPlugin)
.settings(Dependencies.cloudflowRunnerConfig)
.settings(
scalaVersion := Dependencies.Scala213,
crossScalaVersions := Vector(Dependencies.Scala212, Dependencies.Scala213),
scalafmtOnCompile := true)

lazy val root = Project(id = "root", base = file("."))
.settings(name := "root", skip in publish := true, scalafmtOnCompile := true, crossScalaVersions := Seq())
.withId("root")
Expand All @@ -285,4 +294,5 @@ lazy val root = Project(id = "root", base = file("."))
cloudflowNewItLibrary,
cloudflowOperator,
cloudflowSbtPlugin,
cloudflowRunnerConfig,
tooling)
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,15 @@ package cloudflow.blueprint.deployment

import java.io.File

import com.typesafe.config._
import spray.json._

import cloudflow.blueprint.VolumeMountDescriptor

trait ConfigMapData {
def filename: String
def data: String
}

case class RunnerConfig(data: String) extends ConfigMapData {
val filename: String = RunnerConfig.AppConfigFilename
}

object RunnerConfig extends DefaultJsonProtocol with ConfigJsonFormat {
object RunnerConfig {
val PortMappingsPath = "cloudflow.runner.streamlet.context.port_mappings"
val AppConfigFilename = "application.conf"
implicit val topicFormat = jsonFormat(Topic.apply, "id", "cluster", "config")

def apply(appId: String, appVersion: String, deployment: StreamletDeployment): RunnerConfig = {
val map = Map("runner" -> toRunnerJson(appId, appVersion, deployment))
RunnerConfig(JsObject("cloudflow" -> JsObject(map)).compactPrint)
}

private def toRunnerJson(appId: String, appVersion: String, deployment: StreamletDeployment) =
JsObject(
"streamlet" -> JsObject(
"class_name" -> JsString(deployment.className),
"streamlet_ref" -> JsString(deployment.streamletName),
"context" -> JsObject(
"app_id" -> appId.toJson,
"app_version" -> appVersion.toJson,
"config" -> toJson(deployment.config),
"volume_mounts" -> toVolumeMountJson(deployment.volumeMounts),
"port_mappings" -> toPortMappingsJson(deployment.portMappings))))

private def toJson(config: Config) = config.root().render(ConfigRenderOptions.concise()).parseJson

private def toPortMappingsJson(portMappings: Map[String, Topic]) =
JsObject(portMappings.map {
case (portName, topic) => portName -> topic.toJson
})

private def toVolumeMountJson(volumeMounts: Option[List[VolumeMountDescriptor]]) =
JsArray(
volumeMounts
.getOrElse(Vector())
.map {
case VolumeMountDescriptor(name, path, accessMode, _) =>
JsObject("name" -> JsString(name), "path" -> JsString(path), "access_mode" -> JsString(accessMode))
}
.toVector)

}

case class PrometheusConfig(data: String) extends ConfigMapData {
Expand Down

This file was deleted.

Loading