Skip to content

Commit

Permalink
Merge pull request #262 from ie3-institute/sp/#242-kafka-runtime-sink
Browse files Browse the repository at this point in the history
RuntimeEvent kafka sink
  • Loading branch information
t-ober authored Aug 2, 2022
2 parents 0f08be8 + 8323864 commit ef2af1d
Show file tree
Hide file tree
Showing 18 changed files with 826 additions and 395 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Relevant scientific papers have been added to the documentation [#139](https://github.com/ie3-institute/simona/issues/139)
- Add troubleshooting section to Users guide [#160](https://github.com/ie3-institute/simona/issues/160)
- Added Kafka sink for results [#24](https://github.com/ie3-institute/simona/issues/24)
- Added Kafka sink for runtime events, re-implemented RuntimeEventListener in akka typed [#242](https://github.com/ie3-institute/simona/issues/242)

### Changed
- Re-organizing test resources into their respective packages [#105](https://github.com/ie3-institute/simona/issues/105)
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ dependencies {
testRuntimeOnly 'com.vladsch.flexmark:flexmark-all:0.64.0' //scalatest html output
testImplementation group: 'org.pegdown', name: 'pegdown', version: '1.6.0'
testImplementation "com.typesafe.akka:akka-testkit_${scalaVersion}:${akkaVersion}" // akka testkit
testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${scalaVersion}:${akkaVersion}"

// testcontainers
testImplementation "com.dimafeng:testcontainers-scala-scalatest_${scalaVersion}:${testContainerVersion}"
Expand All @@ -122,10 +123,10 @@ dependencies {

/* CORE Akka */
implementation "com.typesafe.akka:akka-actor_${scalaVersion}:${akkaVersion}"
implementation "com.typesafe.akka:akka-actor-typed_${scalaVersion}:${akkaVersion}"
implementation "com.typesafe.akka:akka-slf4j_${scalaVersion}:${akkaVersion}"
implementation "com.typesafe.akka:akka-cluster_${scalaVersion}:${akkaVersion}"
implementation "com.lightbend.akka:akka-stream-alpakka-csv_${scalaVersion}:3.0.4"
implementation "com.typesafe.akka:akka-actor_${scalaVersion}:${akkaVersion}"
implementation "com.typesafe.akka:akka-cluster-sharding_${scalaVersion}:${akkaVersion}"
implementation "com.typesafe.akka:akka-cluster-tools_${scalaVersion}:${akkaVersion}"

Expand Down
14 changes: 14 additions & 0 deletions src/main/resources/config/config-template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ ResultKafkaParams {
topicNodeRes = string
}

#@define extends KafkaParams
RuntimeKafkaParams {
base: KafkaParams
topic = string
}

#@define
BaseOutputConfig {
notifier: string # Result event notifier
Expand Down Expand Up @@ -232,6 +238,14 @@ simona.runtime.selected_subgrids = [int] // todo convert this into a list of obj
#@optional
simona.runtime.selected_volt_lvls = [VoltLvlConfig]

simona.runtime.listener = {
#@optional
eventsToProcess = [string]

#@optional
kafka = RuntimeKafkaParams
}

simona.runtime.participant = {
requestVoltageDeviationThreshold = "double | 1E-14" # Nodal voltages deviating more than this between requests in the same tick, are considered being different
load = {
Expand Down
18 changes: 18 additions & 0 deletions src/main/scala/edu/ie3/simona/config/ConfigFailFast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ case object ConfigFailFast extends LazyLogging {
simonaConfig.simona.runtime.participant
)

/* Check the runtime listener configuration */
checkRuntimeListenerConfiguration(
simonaConfig.simona.runtime.listener
)

/* Check if the provided combination of data source and parameters are valid */
checkGridDataSource(simonaConfig.simona.input.grid.datasource)

Expand Down Expand Up @@ -196,6 +201,7 @@ case object ConfigFailFast extends LazyLogging {
/** Check time configuration
*
* @param timeConfig
* the time config
*/
private def checkDateTime(
timeConfig: SimonaConfig.Simona.Time
Expand Down Expand Up @@ -263,6 +269,18 @@ case object ConfigFailFast extends LazyLogging {
.foreach(checkSpecificLoadModelConfig)
}

/** Check the runtime event listener config
* @param listenerConfig
* the runtime listener config
*/
private def checkRuntimeListenerConfiguration(
listenerConfig: SimonaConfig.Simona.Runtime.Listener
): Unit = {
listenerConfig.kafka.foreach(kafka =>
checkKafkaParams(kafka, Seq(kafka.topic))
)
}

/** Check participants's basic runtime configurations, as well as in default
* as in individual configs. This comprises
* i.e. uuid and scaling factor
Expand Down
99 changes: 99 additions & 0 deletions src/main/scala/edu/ie3/simona/config/SimonaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,63 @@ object SimonaConfig {

}

final case class RuntimeKafkaParams(
override val bootstrapServers: java.lang.String,
override val linger: scala.Int,
override val runId: java.lang.String,
override val schemaRegistryUrl: java.lang.String,
topic: java.lang.String
) extends KafkaParams(bootstrapServers, linger, runId, schemaRegistryUrl)
object RuntimeKafkaParams {
def apply(
c: com.typesafe.config.Config,
parentPath: java.lang.String,
$tsCfgValidator: $TsCfgValidator
): SimonaConfig.RuntimeKafkaParams = {
SimonaConfig.RuntimeKafkaParams(
topic = $_reqStr(parentPath, c, "topic", $tsCfgValidator),
bootstrapServers =
$_reqStr(parentPath, c, "bootstrapServers", $tsCfgValidator),
linger = $_reqInt(parentPath, c, "linger", $tsCfgValidator),
runId = $_reqStr(parentPath, c, "runId", $tsCfgValidator),
schemaRegistryUrl =
$_reqStr(parentPath, c, "schemaRegistryUrl", $tsCfgValidator)
)
}
private def $_reqInt(
parentPath: java.lang.String,
c: com.typesafe.config.Config,
path: java.lang.String,
$tsCfgValidator: $TsCfgValidator
): scala.Int = {
if (c == null) 0
else
try c.getInt(path)
catch {
case e: com.typesafe.config.ConfigException =>
$tsCfgValidator.addBadPath(parentPath + path, e)
0
}
}

private def $_reqStr(
parentPath: java.lang.String,
c: com.typesafe.config.Config,
path: java.lang.String,
$tsCfgValidator: $TsCfgValidator
): java.lang.String = {
if (c == null) null
else
try c.getString(path)
catch {
case e: com.typesafe.config.ConfigException =>
$tsCfgValidator.addBadPath(parentPath + path, e)
null
}
}

}

final case class VoltLvlConfig(
id: java.lang.String,
vNom: java.lang.String
Expand Down Expand Up @@ -1815,11 +1872,47 @@ object SimonaConfig {
}

final case class Runtime(
listener: SimonaConfig.Simona.Runtime.Listener,
participant: SimonaConfig.Simona.Runtime.Participant,
selected_subgrids: scala.Option[scala.List[scala.Int]],
selected_volt_lvls: scala.Option[scala.List[SimonaConfig.VoltLvlConfig]]
)
object Runtime {
final case class Listener(
eventsToProcess: scala.Option[scala.List[java.lang.String]],
kafka: scala.Option[SimonaConfig.RuntimeKafkaParams]
)
object Listener {
def apply(
c: com.typesafe.config.Config,
parentPath: java.lang.String,
$tsCfgValidator: $TsCfgValidator
): SimonaConfig.Simona.Runtime.Listener = {
SimonaConfig.Simona.Runtime.Listener(
eventsToProcess =
if (c.hasPathOrNull("eventsToProcess"))
scala.Some(
$_L$_str(
c.getList("eventsToProcess"),
parentPath,
$tsCfgValidator
)
)
else None,
kafka =
if (c.hasPathOrNull("kafka"))
scala.Some(
SimonaConfig.RuntimeKafkaParams(
c.getConfig("kafka"),
parentPath + "kafka.",
$tsCfgValidator
)
)
else None
)
}
}

final case class Participant(
evcs: SimonaConfig.Simona.Runtime.Participant.Evcs,
fixedFeedIn: SimonaConfig.Simona.Runtime.Participant.FixedFeedIn,
Expand Down Expand Up @@ -2105,6 +2198,12 @@ object SimonaConfig {
$tsCfgValidator: $TsCfgValidator
): SimonaConfig.Simona.Runtime = {
SimonaConfig.Simona.Runtime(
listener = SimonaConfig.Simona.Runtime.Listener(
if (c.hasPathOrNull("listener")) c.getConfig("listener")
else com.typesafe.config.ConfigFactory.parseString("listener{}"),
parentPath + "listener.",
$tsCfgValidator
),
participant = SimonaConfig.Simona.Runtime.Participant(
if (c.hasPathOrNull("participant")) c.getConfig("participant")
else com.typesafe.config.ConfigFactory.parseString("participant{}"),
Expand Down
40 changes: 0 additions & 40 deletions src/main/scala/edu/ie3/simona/event/listener/DummyListener.scala

This file was deleted.

Loading

0 comments on commit ef2af1d

Please sign in to comment.