Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Capacity Reservation support #1977

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/docs/reference/server/config.yml.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ There are two ways to configure AWS: using an access key or using the default cr
"ec2:AttachVolume",
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreatePlacementGroup",
"ec2:CancelSpotInstanceRequests",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:CreateVolume",
"ec2:DeletePlacementGroup",
"ec2:DeleteVolume",
"ec2:DescribeAvailabilityZones",
"ec2:DescribeCapacityReservations"
"ec2:DescribeImages",
"ec2:DescribeInstances",
"ec2:DescribeInstanceAttribute",
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/cli/services/configurators/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def th(s: str) -> str:
fleet_type = "cloud"
nodes = plan.spec.configuration.nodes or "-"
placement = plan.spec.configuration.placement or InstanceGroupPlacement.ANY
reservation = plan.spec.configuration.reservation
backends = None
if plan.spec.configuration.backends is not None:
backends = ", ".join(b.value for b in plan.spec.configuration.backends)
Expand Down Expand Up @@ -287,6 +288,8 @@ def th(s: str) -> str:
configuration_table.add_row(th("Resources"), resources)
if spot_policy is not None:
configuration_table.add_row(th("Spot policy"), spot_policy)
if reservation is not None:
configuration_table.add_row(th("Reservation"), reservation)

offers_table = Table(box=None)
offers_table.add_column("#")
Expand Down
8 changes: 8 additions & 0 deletions src/dstack/_internal/cli/utils/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def print_fleets_table(fleets: List[Fleet], verbose: bool = False) -> None:
def get_fleets_table(fleets: List[Fleet], verbose: bool = False) -> Table:
table = Table(box=None)
table.add_column("FLEET", no_wrap=True)
if verbose:
table.add_column("RESERVATION")
table.add_column("INSTANCE")
table.add_column("BACKEND")
table.add_column("RESOURCES")
Expand Down Expand Up @@ -59,6 +61,12 @@ def get_fleets_table(fleets: List[Fleet], verbose: bool = False) -> Table:
pretty_date(instance.created),
]

if verbose and i == 0:
row.insert(
1,
fleet.spec.configuration.reservation if i == 0 else "",
)

if verbose:
error = ""
if instance.status == InstanceStatus.TERMINATED and instance.termination_reason:
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def th(s: str) -> str:
props.add_row(th("Creation policy"), creation_policy)
props.add_row(th("Termination policy"), termination_policy)
props.add_row(th("Termination idle time"), termination_idle_time)
props.add_row(th("Reservation"), run_plan.run_spec.configuration.reservation)

offers = Table(box=None)
offers.add_column("#")
Expand Down Expand Up @@ -125,6 +126,8 @@ def get_runs_table(
table.add_column("INSTANCE", no_wrap=True)
table.add_column("RESOURCES")
table.add_column("SPOT")
if verbose:
table.add_column("RESERVATION", no_wrap=True)
table.add_column("PRICE", no_wrap=True)
table.add_column("STATUS", no_wrap=True)
table.add_column("SUBMITTED", style="grey58", no_wrap=True)
Expand Down Expand Up @@ -161,6 +164,7 @@ def get_runs_table(
"INSTANCE": jpd.instance_type.name,
"RESOURCES": jpd.instance_type.resources.pretty_format(),
"SPOT": "yes" if jpd.instance_type.resources.spot else "no",
"RESERVATION": jpd.reservation,
"PRICE": f"${jpd.price:.4}",
}
)
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT = [
BackendType.AWS,
]
BACKENDS_WITH_RESERVATION_SUPPORT = [
BackendType.AWS,
]

BACKENDS_WITH_GATEWAY_SUPPORT = [
BackendType.AWS,
BackendType.AZURE,
Expand Down
46 changes: 45 additions & 1 deletion src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,40 @@ def __init__(self, config: AWSConfig):
def get_offers(
self, requirements: Optional[Requirements] = None
) -> List[InstanceOfferWithAvailability]:
filter = _supported_instances
if requirements and requirements.reservation:
region_to_reservation = {}
for region in self.config.regions:
reservation = aws_resources.get_reservation(
ec2_client=self.session.client("ec2", region_name=region),
reservation_id=requirements.reservation,
instance_count=1,
)
if reservation is not None:
region_to_reservation[region] = reservation

def _supported_instances_with_reservation(offer: InstanceOffer) -> bool:
# Filter: only instance types supported by dstack
if not _supported_instances(offer):
return False
# Filter: Spot instances can't be used with reservations
if offer.instance.resources.spot:
return False
region = offer.region
reservation = region_to_reservation.get(region)
# Filter: only instance types matching the capacity reservation
if not bool(reservation and offer.instance.name == reservation["InstanceType"]):
return False
return True

filter = _supported_instances_with_reservation

offers = get_catalog_offers(
backend=BackendType.AWS,
locations=self.config.regions,
requirements=requirements,
configurable_disk_size=CONFIGURABLE_DISK_SIZE,
extra_filter=_supported_instances,
extra_filter=filter,
)
regions = set(i.region for i in offers)

Expand Down Expand Up @@ -160,6 +188,19 @@ def create_instance(
ec2_client=ec2_client,
subnet_ids=subnet_ids,
)
if instance_config.reservation:
reservation = aws_resources.get_reservation(
ec2_client=ec2_client,
reservation_id=instance_config.reservation,
instance_count=1,
)
if reservation is not None:
# Filter out az different from capacity reservation
subnet_id_to_az_map = {
k: v
for k, v in subnet_id_to_az_map.items()
if v == reservation["AvailabilityZone"]
}
except botocore.exceptions.ClientError as e:
logger.warning("Got botocore.exceptions.ClientError: %s", e)
raise NoCapacityError()
Expand Down Expand Up @@ -193,6 +234,7 @@ def create_instance(
allocate_public_ip=allocate_public_ip,
placement_group_name=instance_config.placement_group_name,
enable_efa=enable_efa,
reservation_id=instance_config.reservation,
)
)
instance = response[0]
Expand All @@ -212,6 +254,7 @@ def create_instance(
internal_ip=instance.private_ip_address,
region=instance_offer.region,
availability_zone=az,
reservation=instance.capacity_reservation_id,
price=instance_offer.price,
username=username,
ssh_port=22,
Expand Down Expand Up @@ -241,6 +284,7 @@ def run_job(
],
job_docker_config=None,
user=run.user,
reservation=run.run_spec.configuration.reservation,
)
if len(volumes) > 0:
volume = volumes[0]
Expand Down
38 changes: 38 additions & 0 deletions src/dstack/_internal/core/backends/aws/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def create_instances_struct(
allocate_public_ip: bool = True,
placement_group_name: Optional[str] = None,
enable_efa: bool = False,
reservation_id: Optional[str] = None,
) -> Dict[str, Any]:
struct: Dict[str, Any] = dict(
BlockDeviceMappings=[
Expand Down Expand Up @@ -205,6 +206,11 @@ def create_instances_struct(
"GroupName": placement_group_name,
}

if reservation_id is not None:
struct["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {"CapacityReservationId": reservation_id}
}

return struct


Expand Down Expand Up @@ -596,3 +602,35 @@ def _is_private_subnet_with_internet_egress(
return True

return False


def get_reservation(
ec2_client: botocore.client.BaseClient,
reservation_id: str,
instance_count: int = 0,
instance_types: List[str] = None,
is_capacity_block: bool = False,
) -> Optional[Dict[str, Any]]:
filters = [{"Name": "state", "Values": ["active"]}]
if instance_types:
filters.append({"Name": "instance-type", "Values": instance_types})
try:
response = ec2_client.describe_capacity_reservations(
CapacityReservationIds=[reservation_id], Filters=filters
)
except botocore.exceptions.ClientError as e:
if e.response.get("Error", {}).get("Code") == "InvalidCapacityReservationId.NotFound":
logger.debug(
"Skipping reservation %s . Capacity Reservation not found.", reservation_id
)
return None
raise
reservation = response["CapacityReservations"][0]

if instance_count > 0 and reservation["AvailableInstanceCount"] < instance_count:
return None

if is_capacity_block and reservation["ReservationType"] != "capacity-block":
return None

return reservation
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class InstanceGroupParams(CoreModel):
Optional[InstanceGroupPlacement],
Field(description="The placement of instances: `any` or `cluster`"),
] = None
reservation: Annotated[
Optional[str],
Field(description="The existing reservation for the instances"),
] = None
Comment on lines +103 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pointing out another client backward incompatibility that we need to address.

resources: Annotated[
Optional[ResourcesSpec],
Field(description="The resources requirements"),
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class InstanceConfiguration(CoreModel):
instance_id: Optional[str] = None
availability_zone: Optional[str] = None
placement_group_name: Optional[str] = None
reservation: Optional[str] = None
job_docker_config: Optional[DockerConfig] # FIXME: cannot find any usages – remove?

def get_public_keys(self) -> List[str]:
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class ProfileParams(CoreModel):
description="The cloud-specific instance types to consider for provisioning (e.g., `[p3.8xlarge, n1-standard-4]`)"
),
]
reservation: Annotated[
Optional[str],
Field(description="The existing reservation for the instances"),
]
spot_policy: Annotated[
Optional[SpotPolicy],
Field(
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class Requirements(CoreModel):
resources: ResourcesSpec
max_price: Optional[float]
spot: Optional[bool]
reservation: Optional[str]

def pretty_format(self, resources_only: bool = False):
res = self.resources.pretty_format()
Expand Down Expand Up @@ -210,6 +211,7 @@ class JobProvisioningData(CoreModel):
instance_network: Optional[str] = None
region: str
availability_zone: Optional[str] = None
reservation: Optional[str] = None
price: float
username: str
# ssh_port be different from 22 for some backends.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ def _get_or_create_fleet_model_for_job(
configuration=FleetConfiguration(
name=run.run_spec.run_name,
placement=placement,
reservation=run.run_spec.configuration.reservation,
),
profile=run.run_spec.merged_profile,
autocreated=True,
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/services/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ async def create_fleet(
pool=pool,
spec=spec,
placement_group_name=placement_group_name,
reservation=spec.configuration.reservation,
instance_num=i,
)
fleet_model.instances.append(instance_model)
Expand All @@ -287,6 +288,7 @@ async def create_fleet_instance_model(
pool: PoolModel,
spec: FleetSpec,
placement_group_name: Optional[str],
reservation: Optional[str],
instance_num: int,
) -> InstanceModel:
profile = spec.merged_profile
Expand All @@ -301,6 +303,7 @@ async def create_fleet_instance_model(
instance_name=f"{spec.configuration.name}-{instance_num}",
instance_num=instance_num,
placement_group_name=placement_group_name,
reservation=reservation,
)
return instance_model

Expand Down Expand Up @@ -647,6 +650,7 @@ def _get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements:
resources=fleet_spec.configuration.resources or ResourcesSpec(),
max_price=profile.max_price,
spot=get_policy_map(profile.spot_policy, default=SpotPolicy.ONDEMAND),
reservation=fleet_spec.configuration.reservation,
)
return requirements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def _requirements(self) -> Requirements:
resources=self.run_spec.configuration.resources,
max_price=self.run_spec.merged_profile.max_price,
spot=None if spot_policy == SpotPolicy.AUTO else (spot_policy == SpotPolicy.SPOT),
reservation=self.run_spec.merged_profile.reservation,
)

def _retry(self) -> Optional[Retry]:
Expand Down
6 changes: 6 additions & 0 deletions src/dstack/_internal/server/services/offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dstack._internal.core.backends import (
BACKENDS_WITH_CREATE_INSTANCE_SUPPORT,
BACKENDS_WITH_MULTINODE_SUPPORT,
BACKENDS_WITH_RESERVATION_SUPPORT,
)
from dstack._internal.core.backends.base import Backend
from dstack._internal.core.models.backends.base import BackendType
Expand Down Expand Up @@ -53,6 +54,11 @@ async def get_offers_by_requirements(
backend_types = BACKENDS_WITH_CREATE_INSTANCE_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_CREATE_INSTANCE_SUPPORT]

if profile.reservation is not None:
if not backend_types:
backend_types = BACKENDS_WITH_RESERVATION_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_RESERVATION_SUPPORT]

# For multi-node, restrict backend and region.
# The default behavior is to provision all nodes in the same backend and region.
if master_job_provisioning_data is not None:
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/services/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ async def create_instance_model(
instance_name: str,
instance_num: int,
placement_group_name: Optional[str],
reservation: Optional[str],
) -> InstanceModel:
instance = InstanceModel(
id=uuid.uuid4(),
Expand Down Expand Up @@ -642,6 +643,7 @@ async def create_instance_model(
instance_id=str(instance.id),
ssh_keys=[project_ssh_key],
placement_group_name=placement_group_name,
reservation=reservation,
job_docker_config=DockerConfig(
image=dstack_default_image,
registry_auth=None,
Expand Down
Loading