Skip to content

Commit

Permalink
initial changes to implement quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
kthare10 committed Dec 9, 2024
1 parent c81b1ab commit cc96a79
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 23 deletions.
42 changes: 41 additions & 1 deletion fabric_cf/actor/core/apis/abc_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,44 @@ def get_poas(self, *, poa_id: str = None, email: str = None, sliver_id: ID = Non
@param offset offset
@param states states
@param last_update_time last update time
"""
"""

def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int):
"""
Create a new quota record in the database.
@param project_id: UUID of the project the quota is associated with.
@param resource_type: Type of resource (e.g., SLICE, COMPONENT).
@param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB).
@param quota_limit: Maximum allowed usage for this resource.
@return: The created `Quotas` object.
@throws: Exception if there is an error during the creation.
"""

def get_quota_lookup(self, project_id: str):
"""
Fetches all quotas for a given project and creates a lookup dictionary.
@param project_id: UUID of the project whose quotas are to be fetched.
@return: Dictionary with keys as (resource_type, resource_unit) and values as quota details.
@throws: Exception if there is an error during the database interaction.
"""

def update_quota(self, reservation: ABCReservationMixin):
"""
Update an existing quota record.
@param reservation: reservation.
@throws: Exception if there is an error during the update.
"""

def delete_quota(self, project_id: str, resource_type: str, resource_unit: str):
"""
Delete a specific quota record.
@param project_id: UUID of the project the quota is associated with.
@param resource_type: Type of resource (e.g., SLICE, COMPONENT).
@param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB).
@return: True if the quota was successfully deleted, False if not found.
@throws: Exception if there is an error during the deletion.
"""
9 changes: 9 additions & 0 deletions fabric_cf/actor/core/apis/abc_mgmt_client_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,12 @@ def reclaim_delegations(self, *, broker: ID, did: ID) -> DelegationAvro:
@return reservation
"""
raise ManageException(Constants.NOT_IMPLEMENTED)

def get_quota_lookup(self, project_id: str) -> dict:
"""
Fetches all quotas for a given project and creates a lookup dictionary.
@param project_id: UUID of the project whose quotas are to be fetched.
@return: Dictionary with keys as (resource_type, resource_unit) and values as quota details.
@throws: Exception if there is an error during the database interaction.
"""
6 changes: 6 additions & 0 deletions fabric_cf/actor/core/kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import threading
import time
import traceback
from datetime import datetime, timezone

from typing import List, Dict

Expand Down Expand Up @@ -219,6 +220,10 @@ def close(self, *, reservation: ABCReservationMixin, force: bool = False):
self.policy.close(reservation=reservation)
reservation.close(force=force)
self.plugin.get_database().update_reservation(reservation=reservation)
## TODO release resources back if deleted before expiry
if reservation.get_term().get_remaining_length() > 0:
self.plugin.get_database().update_quota(reservation=reservation)

reservation.service_close()
except Exception as e:
err = f"An error occurred during close for reservation #{reservation.get_reservation_id()}"
Expand Down Expand Up @@ -1453,6 +1458,7 @@ def update_ticket(self, *, reservation: ABCReservationMixin, update: Reservation
self.plugin.get_database().update_reservation(reservation=reservation)
if not reservation.is_failed():
reservation.service_update_ticket()
self.plugin.get_database().update_quota(reservation=reservation)
except Exception as e:
self.logger.error(traceback.format_exc())
self.error(err=f"An error occurred during update ticket for "
Expand Down
19 changes: 18 additions & 1 deletion fabric_cf/actor/core/manage/actor_management_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, List, Dict, Tuple

from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro

from fabric_cf.actor.fim.fim_helper import FimHelper
from fabric_mb.message_bus.messages.reservation_mng import ReservationMng
from fabric_mb.message_bus.messages.result_delegation_avro import ResultDelegationAvro
Expand Down Expand Up @@ -902,4 +904,19 @@ def build_broker_query_model(self, level_0_broker_query_model: str, level: int,
end=end, includes=includes, excludes=excludes)
except Exception as e:
self.logger.error(f"Exception occurred build_broker_query_model e: {e}")
self.logger.error(traceback.format_exc())
self.logger.error(traceback.format_exc())

def get_quota_lookup(self, project_id: str, caller: AuthToken) -> dict:
"""
Fetches all quotas for a given project and creates a lookup dictionary.
@param project_id: UUID of the project whose quotas are to be fetched.
@param caller: caller
@return: Dictionary with keys as (resource_type, resource_unit) and values as quota details.
@throws: Exception if there is an error during the database interaction.
"""
try:
return self.db.get_quota_lookup(project_id=project_id)
except Exception as e:
self.logger.error(f"Exception occurred build_broker_query_model e: {e}")
self.logger.error(traceback.format_exc())
7 changes: 7 additions & 0 deletions fabric_cf/actor/core/manage/local/local_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,10 @@ def poa(self, *, poa: PoaAvro) -> bool:
self.on_exception(e=e, traceback_str=traceback.format_exc())

return False

def get_quota_lookup(self, project_id: str) -> dict:
self.clear_last()
try:
return self.manager.get_quota_lookup(project_id=project_id, caller=self.auth)
except Exception as e:
self.on_exception(e=e, traceback_str=traceback.format_exc())
92 changes: 92 additions & 0 deletions fabric_cf/actor/core/plugins/db/actor_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from datetime import datetime
from typing import List, Union, Tuple, Dict

from fim.user import ComponentType

from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin, ActorType
from fabric_cf.actor.core.apis.abc_broker_proxy import ABCBrokerProxy
from fabric_cf.actor.core.apis.abc_controller_reservation import ABCControllerReservation
Expand All @@ -44,6 +46,7 @@
from fabric_cf.actor.core.plugins.handlers.configuration_mapping import ConfigurationMapping
from fabric_cf.actor.core.container.maintenance import Site
from fabric_cf.actor.core.util.id import ID
from fabric_cf.actor.core.util.utils import extract_quota_usage
from fabric_cf.actor.db.psql_database import PsqlDatabase


Expand Down Expand Up @@ -994,3 +997,92 @@ def remove_poa(self, *, poa_id: str):
finally:
if self.lock.locked():
self.lock.release()

def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int):
try:
self.db.create_quota(project_id=project_id,
resource_type=resource_type,
resource_unit=resource_unit,
quota_limit=quota_limit)
finally:
if self.lock.locked():
self.lock.release()

def get_quota_lookup(self, project_id: str):
try:
return self.db.get_quota_lookup(project_id=project_id)
finally:
if self.lock.locked():
self.lock.release()

def update_quota(self, reservation: ABCReservationMixin):
print("Update Quota")
try:
slice_object = reservation.get_slice()
if not slice_object:
return
project_id = slice_object.get_project_id()
if not project_id:
return

sliver = None
from fabric_cf.actor.core.kernel.reservation_client import ReservationClient
if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \
reservation.get_leased_resources().get_sliver():
sliver = reservation.get_leased_resources().get_sliver()
if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver():
sliver = reservation.get_resources().get_sliver()

if not sliver:
return

if reservation.is_closed() or reservation.is_closing():
duration = reservation.get_term().get_remaining_length()
else:
duration = reservation.get_term().get_length()

if duration < 60:
return

duration /= 3600000
existing_quota = self.db.get_quota_lookup(project_id=project_id)

sliver_quota_usage = extract_quota_usage(sliver=sliver, duration=duration)

print(f"Existing: {existing_quota}")
print(f"Updated by: {sliver_quota_usage}")

# Check each accumulated resource usage against its quota
for quota_key, total_duration in sliver_quota_usage.items():
print(f"Iteration: {quota_key}")
current_duration = 0
if quota_key in existing_quota:
current_duration = existing_quota.get(quota_key)
(resource_type, resource_unit) = quota_key

# Return resource hours for a sliver deleted before expiry
if reservation.is_closing() or reservation.is_closed():
usage = current_duration["quota_used"] - total_duration
if usage < 0:
usage = 0
self.db.update_quota(project_id=project_id,
resource_type=resource_type,
resource_unit=resource_unit, quota_used=usage)
# Account for resource hours used for a new or extended sliver
else:
usage = total_duration + current_duration["quota_used"]
self.db.update_quota(project_id=project_id,
resource_type=resource_type,
resource_unit=resource_unit, quota_used=usage)
finally:
if self.lock.locked():
self.lock.release()

def delete_quota(self, project_id: str, resource_type: str, resource_unit: str):
try:
self.db.delete_quota(project_id=project_id,
resource_type=resource_type,
resource_unit=resource_unit)
finally:
if self.lock.locked():
self.lock.release()
12 changes: 12 additions & 0 deletions fabric_cf/actor/core/time/term.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,18 @@ def get_full_length(self) -> int:

return end_ms - start_ms + 1

def get_remaining_length(self) -> int:
"""
Returns the length of remaining term in milliseconds. The length of a term is the
number of milliseconds in the closed interval [now, end]
@returns term length
"""
now = datetime.now(timezone.utc)
current_ms = ActorClock.to_milliseconds(when=now)
end_ms = ActorClock.to_milliseconds(when=self.end_time)

return end_ms - current_ms + 1

def get_length(self) -> int:
"""
Returns the length of a term in milliseconds. The length of a term is the
Expand Down
84 changes: 82 additions & 2 deletions fabric_cf/actor/core/util/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
# Author Komal Thareja (kthare10@renci.org)
import hashlib
from bisect import bisect_left
from typing import Tuple, Dict

from fabric_mb.message_bus.messages.abc_message_avro import AbcMessageAvro
from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro
from fim.slivers.base_sliver import BaseSliver
from fim.slivers.network_node import NodeSliver
from fim.slivers.network_service import NetworkServiceSliver
from fim.user import ComponentType
from fim.user.topology import TopologyDiff, TopologyDiffTuple
from fim.user import ComponentType, InstanceCatalog

from fabric_cf.actor.security.pdp_auth import ActionId

Expand Down Expand Up @@ -118,3 +119,82 @@ def generate_sha256(*, token: str):
sha256_hex = sha256_hash.hexdigest()

return sha256_hex


def extract_quota_usage(sliver, duration: float) -> Dict[Tuple[str, str], float]:
"""
Extract quota usage from a sliver
@param sliver: The sliver object from which resources are extracted.
@param duration: Number of hours the resources are requested for.
@return: A dictionary of resource type/unit tuples to requested amounts.
"""
unit = "HOURS"
requested_resources = {}

# Check if the sliver is a NodeSliver
if not isinstance(sliver, NodeSliver):
return requested_resources

allocations = sliver.get_capacity_allocations()
if not allocations and sliver.get_capacity_hints():
catalog = InstanceCatalog()
allocations = catalog.get_instance_capacities(instance_type=sliver.get_capacity_hints().instance_type)
else:
allocations = sliver.get_capacities()

# Extract Core, Ram, Disk Hours
requested_resources[("Core", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.core)
requested_resources[("RAM", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.ram)
requested_resources[("Disk", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.disk)

# Extract component hours (e.g., GPU, FPGA, SmartNIC)
if sliver.attached_components_info:
for c in sliver.attached_components_info.devices.values():
component_type = str(c.get_type())
requested_resources[(component_type, unit)] = (
requested_resources.get((component_type, unit), 0) + duration
)

return requested_resources


def enforce_quota_limits(quota_lookup: dict, computed_reservations: list[LeaseReservationAvro],
duration: float) -> Tuple[bool, str]:
"""
Check if the requested resources for multiple reservations are within the project's quota limits.
@param quota_lookup: Quota Limits for various resource types.
@param computed_reservations: List of slivers requested.
@param duration: Number of hours the reservations are requested for.
@return: Tuple (True, None) if resources are within quota, or (False, message) if denied.
@throws: Exception if there is an error during the database interaction.
"""
try:
requested_resources = {}

# Accumulate resource usage for all reservations
for r in computed_reservations:
sliver = r.get_sliver()
sliver_resources = extract_quota_usage(sliver, duration)
for key, value in sliver_resources.items():
requested_resources[key] = requested_resources.get(key, 0) + value

# Check each accumulated resource usage against its quota
for quota_key, total_requested_duration in requested_resources.items():
if quota_key not in quota_lookup:
return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})."

quota_info = quota_lookup[quota_key]
available_quota = quota_info["quota_limit"] - quota_info["quota_used"]

if total_requested_duration > available_quota:
return False, (
f"Requested {total_requested_duration} {quota_key[1]} of {quota_key[0]}, "
f"but only {available_quota} is available."
)

# If all checks pass
return True, None
except Exception as e:
raise Exception(f"Error while checking reservation: {str(e)}")
24 changes: 22 additions & 2 deletions fabric_cf/actor/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
#
# Author: Komal Thareja (kthare10@renci.org)

from sqlalchemy import JSON, ForeignKey, LargeBinary, Index, TIMESTAMP
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import JSON, ForeignKey, LargeBinary, Index, TIMESTAMP, UUID, func, text, Float
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Integer, Sequence
from sqlalchemy.orm import relationship


Base = declarative_base()

FOREIGN_KEY_ACTOR_ID = 'Actors.act_id'
Expand Down Expand Up @@ -241,3 +241,23 @@ class Components(Base):
bdf = Column(String, primary_key=True, index=True)
reservation = relationship('Reservations', back_populates='components')


class Quotas(Base):
__tablename__ = "quotas"

resource_type = Column(String(50), primary_key=True, index=True, nullable=False)
project_id = Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False)
resource_unit = Column(String(20), primary_key=True, index=True, nullable=False, default="HOURS")
quota_limit = Column(Float, nullable=False)
quota_used = Column(Float, default=0)
created_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
server_default=text("timezone('utc', now())") # Explicitly ensure UTC in PostgreSQL
)
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=False,
server_default=text("timezone('utc', now())"),
onupdate=func.now()
)
Loading

0 comments on commit cc96a79

Please sign in to comment.