Skip to content

Commit

Permalink
Merge pull request #25 from henriqueribeiro/sns
Browse files Browse the repository at this point in the history
Actors to push metadata to AWS SNS
  • Loading branch information
henriqueribeiro authored Aug 30, 2022
2 parents 87cffaa + 8abf84c commit 490d683
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 0 deletions.
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ object Dependencies {
"ecr",
"ecrpublic",
"secretsmanager",
"sns",
"eventbridge",
).map(artifactName => "software.amazon.awssdk" % artifactName % awsSdkV)

private val googleCloudDependencies = List(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cromwell.services.metadata.impl.aws

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import com.typesafe.config.Config
import cromwell.cloudsupport.aws.AwsConfiguration
import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.services.metadata.MetadataEvent
import cromwell.services.metadata.MetadataService.{MetadataWriteFailure, MetadataWriteSuccess, PutMetadataAction, PutMetadataActionAndRespond}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.eventbridge.EventBridgeClient
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry
import spray.json.enrichAny

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}


/**
* An actor that publishes metadata events to AWS EventBridge
* @param serviceConfig the source of service config information
* @param globalConfig the source of global config information
* @param serviceRegistryActor the actor for registering services
* @see cromwell.services.metadata.impl.aws.HybridEventBridgeMetadataServiceActor
*/
class AwsEventBridgeMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging {
implicit val ec: ExecutionContextExecutor = context.dispatcher

//setup EB client
val busName: String = serviceConfig.getString("aws.busName")

val awsConfig: AwsConfiguration = AwsConfiguration(globalConfig)
val credentialsProviderChain: AwsCredentialsProviderChain =
AwsCredentialsProviderChain.of(awsConfig.authsByName.values.map(_.provider()).toSeq :_*)

lazy val eventBrClient : EventBridgeClient = EventBridgeClient.builder()
.region(awsConfig.region.getOrElse(Region.US_EAST_1))
.credentialsProvider(credentialsProviderChain)
.build();

def publishMessages(events: Iterable[MetadataEvent]): Future[Unit] = {
import AwsEventBridgeMetadataServiceActor.EnhancedMetadataEvents

val eventsJson = events.toJson
//if there are no events then don't publish anything
if( eventsJson.length < 1) { return Future(())}
log.debug(f"Publishing to $busName : $eventsJson")

val reqEntry = PutEventsRequestEntry.builder()
.eventBusName(busName)
.source("cromwell")
.detailType("cromwell-metadata-event")
.detail(eventsJson.mkString(","))
.build()

val eventsRequest = PutEventsRequest.builder()
.entries(reqEntry)
.build()

Future {
eventBrClient.putEvents(eventsRequest)
() //return unit
}
}

override def receive: Receive = {
case action: PutMetadataAction =>
publishMessages(action.events).failed foreach { e =>
log.error(e, "Failed to post metadata: " + action.events)
}
case action: PutMetadataActionAndRespond =>
publishMessages(action.events) onComplete {
case Success(_) => action.replyTo ! MetadataWriteSuccess(action.events)
case Failure(e) => action.replyTo ! MetadataWriteFailure(e, action.events)
}
}
}

object AwsEventBridgeMetadataServiceActor {
def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef): Props = {
Props(new AwsEventBridgeMetadataServiceActor(serviceConfig, globalConfig, serviceRegistryActor)).withDispatcher(ServiceDispatcher)
}

implicit class EnhancedMetadataEvents(val e: Iterable[MetadataEvent]) extends AnyVal {
import cromwell.services.metadata.MetadataJsonSupport._

def toJson: Seq[String] = e.map(_.toJson.toString()).toSeq
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cromwell.services.metadata.impl.aws

import java.util.UUID

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import com.typesafe.config.Config
import cromwell.cloudsupport.aws.AwsConfiguration
import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.services.metadata.MetadataEvent
import cromwell.services.metadata.MetadataService.{MetadataWriteFailure, MetadataWriteSuccess, PutMetadataAction, PutMetadataActionAndRespond}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.PublishRequest
import spray.json.enrichAny

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}


/**
* An actor that publishes metadata events to AWS SNS
* @param serviceConfig the source of service config information
* @param globalConfig the source of global config information
* @param serviceRegistryActor the actor for registering services
* @see cromwell.services.metadata.impl.sns.HybridSnsMetadataServiceActor
*/
class AwsSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging {
implicit val ec: ExecutionContextExecutor = context.dispatcher

//setup sns client
val topicArn: String = serviceConfig.getString("aws.topicArn")

val awsConfig: AwsConfiguration = AwsConfiguration(globalConfig)
val credentialsProviderChain: AwsCredentialsProviderChain =
AwsCredentialsProviderChain.of(awsConfig.authsByName.values.map(_.provider()).toSeq :_*)

lazy val snsClient: SnsClient = SnsClient.builder()
.region(awsConfig.region.getOrElse(Region.US_EAST_1))
.credentialsProvider(credentialsProviderChain)
.build()

def publishMessages(events: Iterable[MetadataEvent]): Future[Unit] = {
import AwsSnsMetadataServiceActor.EnhancedMetadataEvents

val eventsJson = events.toJson
//if there are no events then don't publish anything
if( eventsJson.length < 1) { return Future(())}
log.debug(f"Publishing to $topicArn : $eventsJson")

val message = PublishRequest.builder()
.message("[" + eventsJson.mkString(",") + "]")
.topicArn(topicArn)
.subject("cromwell-metadata-event")

if (topicArn.endsWith(".fifo")) {
message
.messageGroupId("cromwell")
.messageDeduplicationId(UUID.randomUUID().toString())
}

Future {
snsClient.publish(message
.build())
() //return unit
}
}

override def receive: Receive = {
case action: PutMetadataAction =>
publishMessages(action.events).failed foreach { e =>
log.error(e, "Failed to post metadata: " + action.events)
}
case action: PutMetadataActionAndRespond =>
publishMessages(action.events) onComplete {
case Success(_) => action.replyTo ! MetadataWriteSuccess(action.events)
case Failure(e) => action.replyTo ! MetadataWriteFailure(e, action.events)
}
}
}

object AwsSnsMetadataServiceActor {
def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef): Props = {
Props(new AwsSnsMetadataServiceActor(serviceConfig, globalConfig, serviceRegistryActor)).withDispatcher(ServiceDispatcher)
}

implicit class EnhancedMetadataEvents(val e: Iterable[MetadataEvent]) extends AnyVal {
import cromwell.services.metadata.MetadataJsonSupport._

def toJson: Seq[String] = e.map(_.toJson.toString()).toSeq
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cromwell.services.metadata.impl.aws

import akka.actor.{Actor, ActorLogging, ActorRef}
import com.typesafe.config.Config
import cromwell.services.metadata.MetadataService.{PutMetadataAction, PutMetadataActionAndRespond}
import cromwell.services.metadata.impl.MetadataServiceActor


/**
* A metadata service implementation which will function as a standard metadata service but also push all metadata
* events to AWS EventBridge. This class closely follows the pattern established in the
* HybridPubSubMetadataServiceActor
*
* Under the hood it maintains its own MetadataServiceActor and AwsEventBridgeMetadataServiceActor. All messages are routed
* to the MetadataServiceActor. PutMetadataActions are also sent to the AwsEventBridgeMetadataServiceActor. PutMetadataActionAndRespond
* messages will be sent to the EventBridgeMetadataServiceActor as a standard PutMetadataAction, i.e. only the standard
* metadata service will be ACKing the request.
*
* To use this actor something similar to the following should be present in the cromwell.conf file:
* <pre>
* services {
* MetadataService {
* class="cromwell.services.metadata.impl.aws.HybridEventBridgeMetadataServiceActor"
* config {
* aws {
* application-name = "cromwell"
* auths = [{
* name = "default"
* scheme = "default"
* }]
* region = "us-east-1"
* busName = "cromwell-metadata"
* }
* }
* }
* }
* </pre>
*
* @see cromwell.services.metadata.impl.aws.AwsEventBridgeMetadataServiceActor
*/
class HybridEventBridgeMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging {
val standardMetadataActor: ActorRef = context.actorOf(MetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor))
val awsEventBridgeMetadataActor: ActorRef = context.actorOf(AwsEventBridgeMetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor))

override def receive = {
case action: PutMetadataAction =>
standardMetadataActor forward action
awsEventBridgeMetadataActor forward action
case action: PutMetadataActionAndRespond =>
standardMetadataActor forward action
awsEventBridgeMetadataActor forward PutMetadataAction(action.events)
case anythingElse => standardMetadataActor forward anythingElse
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cromwell.services.metadata.impl.aws

import akka.actor.{Actor, ActorLogging, ActorRef}
import com.typesafe.config.Config
import cromwell.services.metadata.MetadataService.{PutMetadataAction, PutMetadataActionAndRespond}
import cromwell.services.metadata.impl.MetadataServiceActor


/**
* A metadata service implementation which will function as a standard metadata service but also push all metadata
* events to AWS SNS (Simple Notification Service). This class closely follows the pattern established in the
* HybridPubSubMetadataServiceActor
*
* Under the hood it maintains its own MetadataServiceActor and AwsSnsMetadataServiceActor. All messages are routed
* to the MetadataServiceActor. PutMetadataActions are also sent to the AwsSnsMetadataServiceActor. PutMetadataActionAndRespond
* messages will be sent to the SnsMetadataServiceActor as a standard PutMetadataAction, i.e. only the standard
* metadata service will be ACKing the request.
*
* To use this actor something similar to the following should be present in the cromwell.conf file:
* <pre>
* services {
* MetadataService {
* class="cromwell.services.metadata.impl.sns.HybridSnsMetadataServiceActor"
* config {
* aws {
* application-name = "cromwell"
* auths = [{
* name = "default"
* scheme = "default"
* }]
* region = "us-east-1"
* topicArn = "arn:aws:sns:us-east-1:1111111111111:cromwell-metadata"
* }
* }
* }
* }
* </pre>
*
* @see cromwell.services.metadata.impl.sns.AwsSnsMetadataServiceActor
*/
class HybridSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging {
val standardMetadataActor: ActorRef = context.actorOf(MetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor))
val awsSnsMetadataActor: ActorRef = context.actorOf(AwsSnsMetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor))

override def receive = {
case action: PutMetadataAction =>
standardMetadataActor forward action
awsSnsMetadataActor forward action
case action: PutMetadataActionAndRespond =>
standardMetadataActor forward action
awsSnsMetadataActor forward PutMetadataAction(action.events)
case anythingElse => standardMetadataActor forward anythingElse
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cromwell.services.metadata.impl.aws

import java.time.OffsetDateTime

import akka.actor.{ActorInitializationException, ActorRef, Props}
import akka.testkit.{EventFilter, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.core.WorkflowId
import cromwell.services.ServicesSpec
import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue}


class AwsEventBridgeMetadataServiceActorSpec extends ServicesSpec {
import AwsEventBridgeMetadataServiceActorSpec._

val registryProbe: ActorRef = TestProbe().ref

"An AwsEventBridgeMetadataActor with an empty serviceConfig" should {
"fail to build" in {
EventFilter[ActorInitializationException](occurrences = 1) intercept {
system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(emptyConfig, emptyConfig, registryProbe)))
}
}
}

"An AwsEventBridgeMetadataActor with a bus name and configuration" should {
"successfully build" in {
system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(configWithBus, emptyConfig, registryProbe)))
}

"process an event" in {
val actor = system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(configWithBus, emptyConfig, registryProbe)))
actor ! event
}
}
}

object AwsSnsMetadataServiceActorSpec {

// This doesn't include a topic so should be a failure
val emptyConfig: Config = ConfigFactory.empty()

val configWithBus: Config = ConfigFactory.parseString(
"""
|aws {
| application-name = "cromwell"
| auths = [{
| name = "default"
| scheme = "default"
| }]
| region = "us-east-1"
| busName = "cromwell-metadata"
|}
""".stripMargin
)

val event: MetadataEvent = MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "key"),
Option(MetadataValue("value")), OffsetDateTime.now)
}

Loading

0 comments on commit 490d683

Please sign in to comment.