Skip to content

Commit

Permalink
Merge pull request #75
Browse files Browse the repository at this point in the history
Supports CapacityProvider
  • Loading branch information
civitaspo authored Jun 24, 2020
2 parents c7c57ac + 614cf84 commit 02af88b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ In addition, the below configurations exist.
- **volumes_from**: Data volumes to mount from another container. (array of map, optional)
- The configuration map is the same as the snake-cased [API_VolumeFrom](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_VolumeFrom.html)
- **working_directory**: The working directory in which to run commands inside the container. (string, optional)
- **capacity_provider_strategy**: An array of capacity provider strategy items to control capacity providers. (array of map, optional)
- The configuration map is the same as the snake-cased [API CapacityProviderStrategyItem](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CapacityProviderStrategyItem.html)
- **cluster**: The short name or full Amazon Resource Name (ARN) of the cluster on which to run your task. (string, required)
- **count**: The number of instantiations of the specified task to place on your cluster. You can specify up to 10 tasks per call. (integer, optional)
- **group**: The name of the task group to associate with the task. The default value is the family name of the task definition (for example, family:my-family-name). (string, optional)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ case class EcsTaskCommandRunner(
val workingDirectory: Optional[String] = params.getOptional("working_directory", classOf[String])

// For ecs_task.run operator
val capacityProviderStrategy: Seq[Config] = params.parseListOrGetEmpty("capacity_provider_strategy", classOf[Config]).asScala.toSeq
val cluster: String = params.get("cluster", classOf[String])
val count: Optional[Int] = params.getOptional("count", classOf[Int])
val group: Optional[String] = params.getOptional("group", classOf[String])
Expand Down Expand Up @@ -175,6 +176,7 @@ case class EcsTaskCommandRunner(
protected def ecsTaskRunInternalSubTask(): Config = {
withDefaultSubTask { subTask =>
subTask.set("_type", "ecs_task.run_internal")
subTask.set("capacityProviderStrategy", capacityProviderStrategy.asJava)
subTask.set("cluster", cluster)
subTask.setOptional("count", count)
subTask.setOptional("group", group)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pro.civitaspo.digdag.plugin.ecs_task.run

import com.amazonaws.services.ecs.model.{
AwsVpcConfiguration,
CapacityProviderStrategyItem,
ContainerOverride,
Failure,
KeyValuePair,
Expand All @@ -26,6 +27,8 @@ import scala.jdk.CollectionConverters._
class EcsTaskRunInternalOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractEcsTaskOperator(operatorName, context, systemConfig, templateEngine) {

val capacityProviderStrategy: Seq[CapacityProviderStrategyItem] =
params.parseListOrGetEmpty("capacity_provider_strategy", classOf[Config]).asScala.map(configureCapacityProviderStrategy).map(_.get).toSeq
val cluster: String = params.get("cluster", classOf[String])
val count: Optional[Int] = params.getOptional("count", classOf[Int])
val group: Optional[String] = params.getOptional("group", classOf[String])
Expand All @@ -51,6 +54,7 @@ class EcsTaskRunInternalOperator(operatorName: String, context: OperatorContext,
protected def buildRunTaskRequest(): RunTaskRequest = {
val req: RunTaskRequest = new RunTaskRequest()

if (capacityProviderStrategy.nonEmpty) req.setCapacityProviderStrategy(capacityProviderStrategy.asJava)
req.setCluster(cluster)
if (count.isPresent) req.setCount(count.get)
if (group.isPresent) req.setGroup(group.get)
Expand All @@ -67,6 +71,21 @@ class EcsTaskRunInternalOperator(operatorName: String, context: OperatorContext,
req
}

protected def configureCapacityProviderStrategy(c: Config): Optional[CapacityProviderStrategyItem] = {
if (c.isEmpty) return Optional.absent()

val base: Optional[Int] = c.getOptional("base", classOf[Int])
val capacityProvider: Optional[String] = c.getOptional("capacity_provider", classOf[String])
val weight: Optional[Int] = c.getOptional("weight", classOf[Int])

val cps: CapacityProviderStrategyItem = new CapacityProviderStrategyItem()
if (base.isPresent) cps.setBase(base.get)
if (capacityProvider.isPresent) cps.setCapacityProvider(capacityProvider.get)
if (weight.isPresent) cps.setWeight(weight.get)

Optional.of(cps)
}

protected def configureNetworkConfiguration(c: Config): Optional[NetworkConfiguration] = {
if (c.isEmpty) return Optional.absent()

Expand Down

0 comments on commit 02af88b

Please sign in to comment.