diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1c2c4460c8e..519c13d4c37 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -312,6 +312,8 @@ object Dependencies { "ecr", "ecrpublic", "secretsmanager", + "sns", + "eventbridge", ).map(artifactName => "software.amazon.awssdk" % artifactName % awsSdkV) private val googleCloudDependencies = List( diff --git a/services/src/main/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActor.scala new file mode 100644 index 00000000000..b22fccb5170 --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActor.scala @@ -0,0 +1,91 @@ +package cromwell.services.metadata.impl.aws + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import com.typesafe.config.Config +import cromwell.cloudsupport.aws.AwsConfiguration +import cromwell.core.Dispatcher.ServiceDispatcher +import cromwell.services.metadata.MetadataEvent +import cromwell.services.metadata.MetadataService.{MetadataWriteFailure, MetadataWriteSuccess, PutMetadataAction, PutMetadataActionAndRespond} +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.eventbridge.EventBridgeClient +import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest +import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry +import spray.json.enrichAny + +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.{Failure, Success} + + +/** + * An actor that publishes metadata events to AWS EventBridge + * @param serviceConfig the source of service config information + * @param globalConfig the source of global config information + * @param serviceRegistryActor the actor for registering services + * @see cromwell.services.metadata.impl.aws.HybridEventBridgeMetadataServiceActor + */ +class AwsEventBridgeMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging { + implicit val ec: ExecutionContextExecutor = context.dispatcher + + //setup EB client + val busName: String = serviceConfig.getString("aws.busName") + + val awsConfig: AwsConfiguration = AwsConfiguration(globalConfig) + val credentialsProviderChain: AwsCredentialsProviderChain = + AwsCredentialsProviderChain.of(awsConfig.authsByName.values.map(_.provider()).toSeq :_*) + + lazy val eventBrClient : EventBridgeClient = EventBridgeClient.builder() + .region(awsConfig.region.getOrElse(Region.US_EAST_1)) + .credentialsProvider(credentialsProviderChain) + .build(); + + def publishMessages(events: Iterable[MetadataEvent]): Future[Unit] = { + import AwsEventBridgeMetadataServiceActor.EnhancedMetadataEvents + + val eventsJson = events.toJson + //if there are no events then don't publish anything + if( eventsJson.length < 1) { return Future(())} + log.debug(f"Publishing to $busName : $eventsJson") + + val reqEntry = PutEventsRequestEntry.builder() + .eventBusName(busName) + .source("cromwell") + .detailType("cromwell-metadata-event") + .detail(eventsJson.mkString(",")) + .build() + + val eventsRequest = PutEventsRequest.builder() + .entries(reqEntry) + .build() + + Future { + eventBrClient.putEvents(eventsRequest) + () //return unit + } + } + + override def receive: Receive = { + case action: PutMetadataAction => + publishMessages(action.events).failed foreach { e => + log.error(e, "Failed to post metadata: " + action.events) + } + case action: PutMetadataActionAndRespond => + publishMessages(action.events) onComplete { + case Success(_) => action.replyTo ! MetadataWriteSuccess(action.events) + case Failure(e) => action.replyTo ! MetadataWriteFailure(e, action.events) + } + } +} + +object AwsEventBridgeMetadataServiceActor { + def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef): Props = { + Props(new AwsEventBridgeMetadataServiceActor(serviceConfig, globalConfig, serviceRegistryActor)).withDispatcher(ServiceDispatcher) + } + + implicit class EnhancedMetadataEvents(val e: Iterable[MetadataEvent]) extends AnyVal { + import cromwell.services.metadata.MetadataJsonSupport._ + + def toJson: Seq[String] = e.map(_.toJson.toString()).toSeq + } +} + diff --git a/services/src/main/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActor.scala new file mode 100644 index 00000000000..724f0af6c03 --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActor.scala @@ -0,0 +1,93 @@ +package cromwell.services.metadata.impl.aws + +import java.util.UUID + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import com.typesafe.config.Config +import cromwell.cloudsupport.aws.AwsConfiguration +import cromwell.core.Dispatcher.ServiceDispatcher +import cromwell.services.metadata.MetadataEvent +import cromwell.services.metadata.MetadataService.{MetadataWriteFailure, MetadataWriteSuccess, PutMetadataAction, PutMetadataActionAndRespond} +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.sns.SnsClient +import software.amazon.awssdk.services.sns.model.PublishRequest +import spray.json.enrichAny + +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.{Failure, Success} + + +/** + * An actor that publishes metadata events to AWS SNS + * @param serviceConfig the source of service config information + * @param globalConfig the source of global config information + * @param serviceRegistryActor the actor for registering services + * @see cromwell.services.metadata.impl.sns.HybridSnsMetadataServiceActor + */ +class AwsSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging { + implicit val ec: ExecutionContextExecutor = context.dispatcher + + //setup sns client + val topicArn: String = serviceConfig.getString("aws.topicArn") + + val awsConfig: AwsConfiguration = AwsConfiguration(globalConfig) + val credentialsProviderChain: AwsCredentialsProviderChain = + AwsCredentialsProviderChain.of(awsConfig.authsByName.values.map(_.provider()).toSeq :_*) + + lazy val snsClient: SnsClient = SnsClient.builder() + .region(awsConfig.region.getOrElse(Region.US_EAST_1)) + .credentialsProvider(credentialsProviderChain) + .build() + + def publishMessages(events: Iterable[MetadataEvent]): Future[Unit] = { + import AwsSnsMetadataServiceActor.EnhancedMetadataEvents + + val eventsJson = events.toJson + //if there are no events then don't publish anything + if( eventsJson.length < 1) { return Future(())} + log.debug(f"Publishing to $topicArn : $eventsJson") + + val message = PublishRequest.builder() + .message("[" + eventsJson.mkString(",") + "]") + .topicArn(topicArn) + .subject("cromwell-metadata-event") + + if (topicArn.endsWith(".fifo")) { + message + .messageGroupId("cromwell") + .messageDeduplicationId(UUID.randomUUID().toString()) + } + + Future { + snsClient.publish(message + .build()) + () //return unit + } + } + + override def receive: Receive = { + case action: PutMetadataAction => + publishMessages(action.events).failed foreach { e => + log.error(e, "Failed to post metadata: " + action.events) + } + case action: PutMetadataActionAndRespond => + publishMessages(action.events) onComplete { + case Success(_) => action.replyTo ! MetadataWriteSuccess(action.events) + case Failure(e) => action.replyTo ! MetadataWriteFailure(e, action.events) + } + } +} + +object AwsSnsMetadataServiceActor { + def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef): Props = { + Props(new AwsSnsMetadataServiceActor(serviceConfig, globalConfig, serviceRegistryActor)).withDispatcher(ServiceDispatcher) + } + + implicit class EnhancedMetadataEvents(val e: Iterable[MetadataEvent]) extends AnyVal { + import cromwell.services.metadata.MetadataJsonSupport._ + + def toJson: Seq[String] = e.map(_.toJson.toString()).toSeq + } +} + diff --git a/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridEventBridgeMetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridEventBridgeMetadataServiceActor.scala new file mode 100644 index 00000000000..fc76207ba9d --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridEventBridgeMetadataServiceActor.scala @@ -0,0 +1,55 @@ +package cromwell.services.metadata.impl.aws + +import akka.actor.{Actor, ActorLogging, ActorRef} +import com.typesafe.config.Config +import cromwell.services.metadata.MetadataService.{PutMetadataAction, PutMetadataActionAndRespond} +import cromwell.services.metadata.impl.MetadataServiceActor + + +/** + * A metadata service implementation which will function as a standard metadata service but also push all metadata + * events to AWS EventBridge. This class closely follows the pattern established in the + * HybridPubSubMetadataServiceActor + * + * Under the hood it maintains its own MetadataServiceActor and AwsEventBridgeMetadataServiceActor. All messages are routed + * to the MetadataServiceActor. PutMetadataActions are also sent to the AwsEventBridgeMetadataServiceActor. PutMetadataActionAndRespond + * messages will be sent to the EventBridgeMetadataServiceActor as a standard PutMetadataAction, i.e. only the standard + * metadata service will be ACKing the request. + * + * To use this actor something similar to the following should be present in the cromwell.conf file: + *
+ * services { + * MetadataService { + * class="cromwell.services.metadata.impl.aws.HybridEventBridgeMetadataServiceActor" + * config { + * aws { + * application-name = "cromwell" + * auths = [{ + * name = "default" + * scheme = "default" + * }] + * region = "us-east-1" + * busName = "cromwell-metadata" + * } + * } + * } + * } + *+ * + * @see cromwell.services.metadata.impl.aws.AwsEventBridgeMetadataServiceActor + */ +class HybridEventBridgeMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging { + val standardMetadataActor: ActorRef = context.actorOf(MetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor)) + val awsEventBridgeMetadataActor: ActorRef = context.actorOf(AwsEventBridgeMetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor)) + + override def receive = { + case action: PutMetadataAction => + standardMetadataActor forward action + awsEventBridgeMetadataActor forward action + case action: PutMetadataActionAndRespond => + standardMetadataActor forward action + awsEventBridgeMetadataActor forward PutMetadataAction(action.events) + case anythingElse => standardMetadataActor forward anythingElse + } +} + diff --git a/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridSnsMetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridSnsMetadataServiceActor.scala new file mode 100644 index 00000000000..d1a05e46924 --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/hybridaws/HybridSnsMetadataServiceActor.scala @@ -0,0 +1,55 @@ +package cromwell.services.metadata.impl.aws + +import akka.actor.{Actor, ActorLogging, ActorRef} +import com.typesafe.config.Config +import cromwell.services.metadata.MetadataService.{PutMetadataAction, PutMetadataActionAndRespond} +import cromwell.services.metadata.impl.MetadataServiceActor + + +/** + * A metadata service implementation which will function as a standard metadata service but also push all metadata + * events to AWS SNS (Simple Notification Service). This class closely follows the pattern established in the + * HybridPubSubMetadataServiceActor + * + * Under the hood it maintains its own MetadataServiceActor and AwsSnsMetadataServiceActor. All messages are routed + * to the MetadataServiceActor. PutMetadataActions are also sent to the AwsSnsMetadataServiceActor. PutMetadataActionAndRespond + * messages will be sent to the SnsMetadataServiceActor as a standard PutMetadataAction, i.e. only the standard + * metadata service will be ACKing the request. + * + * To use this actor something similar to the following should be present in the cromwell.conf file: + *
+ * services { + * MetadataService { + * class="cromwell.services.metadata.impl.sns.HybridSnsMetadataServiceActor" + * config { + * aws { + * application-name = "cromwell" + * auths = [{ + * name = "default" + * scheme = "default" + * }] + * region = "us-east-1" + * topicArn = "arn:aws:sns:us-east-1:1111111111111:cromwell-metadata" + * } + * } + * } + * } + *+ * + * @see cromwell.services.metadata.impl.sns.AwsSnsMetadataServiceActor + */ +class HybridSnsMetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging { + val standardMetadataActor: ActorRef = context.actorOf(MetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor)) + val awsSnsMetadataActor: ActorRef = context.actorOf(AwsSnsMetadataServiceActor.props(serviceConfig, globalConfig, serviceRegistryActor)) + + override def receive = { + case action: PutMetadataAction => + standardMetadataActor forward action + awsSnsMetadataActor forward action + case action: PutMetadataActionAndRespond => + standardMetadataActor forward action + awsSnsMetadataActor forward PutMetadataAction(action.events) + case anythingElse => standardMetadataActor forward anythingElse + } +} + diff --git a/services/src/test/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActorSpec.scala b/services/src/test/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActorSpec.scala new file mode 100644 index 00000000000..ecfab9dc00a --- /dev/null +++ b/services/src/test/scala/cromwell/services/metadata/impl/aws/EventBridgeMetadataServiceActorSpec.scala @@ -0,0 +1,60 @@ +package cromwell.services.metadata.impl.aws + +import java.time.OffsetDateTime + +import akka.actor.{ActorInitializationException, ActorRef, Props} +import akka.testkit.{EventFilter, TestProbe} +import com.typesafe.config.{Config, ConfigFactory} +import cromwell.core.WorkflowId +import cromwell.services.ServicesSpec +import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} + + +class AwsEventBridgeMetadataServiceActorSpec extends ServicesSpec { + import AwsEventBridgeMetadataServiceActorSpec._ + + val registryProbe: ActorRef = TestProbe().ref + + "An AwsEventBridgeMetadataActor with an empty serviceConfig" should { + "fail to build" in { + EventFilter[ActorInitializationException](occurrences = 1) intercept { + system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(emptyConfig, emptyConfig, registryProbe))) + } + } + } + + "An AwsEventBridgeMetadataActor with a bus name and configuration" should { + "successfully build" in { + system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(configWithBus, emptyConfig, registryProbe))) + } + + "process an event" in { + val actor = system.actorOf(Props(new AwsEventBridgeMetadataServiceActor(configWithBus, emptyConfig, registryProbe))) + actor ! event + } + } +} + +object AwsSnsMetadataServiceActorSpec { + + // This doesn't include a topic so should be a failure + val emptyConfig: Config = ConfigFactory.empty() + + val configWithBus: Config = ConfigFactory.parseString( + """ + |aws { + | application-name = "cromwell" + | auths = [{ + | name = "default" + | scheme = "default" + | }] + | region = "us-east-1" + | busName = "cromwell-metadata" + |} + """.stripMargin + ) + + val event: MetadataEvent = MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "key"), + Option(MetadataValue("value")), OffsetDateTime.now) +} + diff --git a/services/src/test/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActorSpec.scala b/services/src/test/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActorSpec.scala new file mode 100644 index 00000000000..26347136655 --- /dev/null +++ b/services/src/test/scala/cromwell/services/metadata/impl/aws/SnsMetadataServiceActorSpec.scala @@ -0,0 +1,60 @@ +package cromwell.services.metadata.impl.aws + +import java.time.OffsetDateTime + +import akka.actor.{ActorInitializationException, ActorRef, Props} +import akka.testkit.{EventFilter, TestProbe} +import com.typesafe.config.{Config, ConfigFactory} +import cromwell.core.WorkflowId +import cromwell.services.ServicesSpec +import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} + + +class AwsSnsMetadataServiceActorSpec extends ServicesSpec { + import AwsSnsMetadataServiceActorSpec._ + + val registryProbe: ActorRef = TestProbe().ref + + "An AwsSnsMetadataActor with an empty serviceConfig" should { + "fail to build" in { + EventFilter[ActorInitializationException](occurrences = 1) intercept { + system.actorOf(Props(new AwsSnsMetadataServiceActor(emptyConfig, emptyConfig, registryProbe))) + } + } + } + + "An AwsSnsMetadataActor with a topic and configuration" should { + "successfully build" in { + system.actorOf(Props(new AwsSnsMetadataServiceActor(configWithTopic, emptyConfig, registryProbe))) + } + + "process an event" in { + val actor = system.actorOf(Props(new AwsSnsMetadataServiceActor(configWithTopic, emptyConfig, registryProbe))) + actor ! event + } + } +} + +object AwsSnsMetadataServiceActorSpec { + + // This doesn't include a topic so should be a failure + val emptyConfig: Config = ConfigFactory.empty() + + val configWithTopic: Config = ConfigFactory.parseString( + """ + |aws { + | application-name = "cromwell" + | auths = [{ + | name = "default" + | scheme = "default" + | }] + | region = "us-east-1" + | topicArn = "arn:aws:sns:us-east-1:1111111111111:cromwell-metadata" + |} + """.stripMargin + ) + + val event: MetadataEvent = MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "key"), + Option(MetadataValue("value")), OffsetDateTime.now) +} + diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md index 2003e0f9ef3..61bb351c7e9 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md @@ -197,6 +197,60 @@ engine { } ``` +### Metadata notifications + +Cromwell has a feature that allows it to send metadata notifications to you. These notifications are mostly state transitions (task start, task end, workflow succeeded, workflow failed, etc) but also task descriptions. + +In the AWS backend those notifications can be send to **SNS Topic** or **EventBridge Bus** and you can use them to trigger some post run jobs. Below you can find information on how to setup it. + +#### AWS SNS + +1. Create an SNS topic, add the following to your `cromwell.conf` file and replace `topicArn` with the topic's ARN you just created: + +``` +services { + MetadataService { + class="cromwell.services.metadata.impl.aws.HybridSnsMetadataServiceActor" + config { + aws { + application-name = "cromwell" + auths = [{ + name = "default" + scheme = "default" + }] + region = "us-east-1" + topicArn = "