Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

402 advance scheduling improvements #403

Merged
merged 38 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d128b3c
load CBM as part of controller startup
kthare10 Sep 20, 2024
f80fc4f
check combined_broker_model_graph_id in restore
kthare10 Sep 20, 2024
b02dc5b
check combined_broker_model_graph_id in restore
kthare10 Sep 20, 2024
cbab262
check combined_broker_model_graph_id in restore
kthare10 Sep 20, 2024
fb7af15
load cbm
kthare10 Sep 20, 2024
ce22f57
load cbm from oc kernel
kthare10 Sep 20, 2024
3604288
load bqm model on start
kthare10 Sep 20, 2024
da4ed2d
load bqm model on start
kthare10 Sep 20, 2024
5ea5a81
fix get_management_actor call to oh
kthare10 Sep 20, 2024
c1e28ab
initial code for determining start time
kthare10 Oct 8, 2024
85c3b14
remove unused imports and variables
kthare10 Oct 8, 2024
e858416
refactored broker policy code so it can be reused from orchestrator
kthare10 Oct 15, 2024
c02158b
added resource tracker
kthare10 Oct 28, 2024
1957845
added resource tracker
kthare10 Oct 28, 2024
cc1ee50
extract delegated capacity function to FimHelper
kthare10 Oct 28, 2024
7e673d9
Updated Orchestrator Kernel to find start time for a slice given the …
kthare10 Oct 29, 2024
ea20d17
Update API for advanced scheduling and its handling to auto determine…
kthare10 Oct 29, 2024
e3934ef
error checks on lease start and end time
kthare10 Oct 29, 2024
ed3eebe
merge from main
kthare10 Oct 29, 2024
df64783
merge from rel1.8
kthare10 Oct 29, 2024
3169ddd
fix logger in kernel
kthare10 Oct 29, 2024
d03aec9
fix merge issues
kthare10 Oct 29, 2024
3b7de0b
remove unneeded variables
kthare10 Oct 29, 2024
49cfc44
remove duplicate imports
kthare10 Oct 30, 2024
f3285be
max default duration
kthare10 Oct 30, 2024
f8713b0
use capacities instead of capacity delegations
kthare10 Oct 30, 2024
5d74fe8
create tracker objects once per CBM Node
kthare10 Oct 30, 2024
474d3ee
add log and update future start time in DB for Slice
kthare10 Oct 30, 2024
2e4cdf7
NetworkNodeInventory.get_delgations to be changed to FimHelper.get_de…
kthare10 Oct 30, 2024
9b40eef
advance scheduling thread to process future slices
kthare10 Nov 5, 2024
9c62d14
Add slivers only after lease time has been computed
kthare10 Nov 5, 2024
041c5b1
fix call to NetworkServiceInventory.get_delegations
kthare10 Nov 5, 2024
1ffff6c
get matching components to check for comp count based on capacities f…
kthare10 Nov 6, 2024
64bcb66
update the version number
kthare10 Nov 6, 2024
780b54a
bump cryptography version
kthare10 Nov 6, 2024
e60e10e
update dependencies
kthare10 Nov 6, 2024
46e742b
update dependencies
kthare10 Nov 6, 2024
19edc39
use release version for fss-uitls
kthare10 Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fabric_cf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = "1.7.0"
__version__ = "1.8.0b1"
__VERSION__ = __version__
24 changes: 22 additions & 2 deletions fabric_cf/actor/core/apis/abc_actor_management_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro:
def get_reservations(self, *, caller: AuthToken, states: List[int] = None,
slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None,
email: str = None, rid_list: List[str] = None, type: str = None,
site: str = None, node_id: str = None,
host: str = None, ip_subnet: str = None, full: bool = False) -> ResultReservationAvro:
site: str = None, node_id: str = None, host: str = None, ip_subnet: str = None,
full: bool = False, start: datetime = None, end: datetime = None) -> ResultReservationAvro:
"""
Get Reservations
@param states states
Expand All @@ -256,10 +256,30 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None,
@param host host
@param ip_subnet ip subnet
@param full
@param start: start time
@param end: end time

@return returns list of the reservations
"""

def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int],
component: str = None, bdf: str = None, start: datetime = None,
end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]:
"""
Returns components matching the search criteria
@param node_id: Worker Node ID to which components belong
@param states: list of states used to find reservations
@param rsv_type: type of reservations
@param component: component name
@param bdf: Component's PCI address
@param start: start time
@param end: end time
@param excludes: Excludes the list of reservations
NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1

@return Dictionary with component name as the key and value as list of associated PCI addresses in use.
"""

def get_slices(self, *, slice_id: ID, caller: AuthToken, slice_name: str = None, email: str = None,
states: List[int] = None, project: str = None, limit: int = None,
offset: int = None, user_id: str = None, search: str = None,
Expand Down
25 changes: 24 additions & 1 deletion fabric_cf/actor/core/apis/abc_mgmt_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from __future__ import annotations

from abc import abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, List, Tuple, Dict

from fabric_mb.message_bus.messages.delegation_avro import DelegationAvro
Expand Down Expand Up @@ -151,7 +152,8 @@ def accept_update_slice(self, *, slice_id: ID) -> bool:
def get_reservations(self, *, states: List[int] = None, slice_id: ID = None,
rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None,
type: str = None, site: str = None, node_id: str = None,
host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]:
host: str = None, ip_subnet: str = None, full: bool = False,
start: datetime = None, end: datetime = None) -> List[ReservationMng]:
"""
Get Reservations
@param states states
Expand All @@ -166,10 +168,31 @@ def get_reservations(self, *, states: List[int] = None, slice_id: ID = None,
@param ip_subnet ip subnet
@param host host
@param full
@param start: start time
@param end: end time
Obtains all reservations
@return returns list of the reservations
"""

def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int],
component: str = None, bdf: str = None, start: datetime = None,
end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]:
"""
Returns components matching the search criteria
@param node_id: Worker Node ID to which components belong
@param states: list of states used to find reservations
@param rsv_type: type of reservations
@param component: component name
@param bdf: Component's PCI address
@param start: start time
@param end: end time
@param excludes: Excludes the list of reservations
NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1

@return Dictionary with component name as the key and value as list of associated PCI addresses in use.
"""
raise NotImplementedError

@abstractmethod
def get_sites(self, *, site: str) -> List[SiteAvro] or None:
"""
Expand Down
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class Constants:

# Orchestrator Lease params
TWO_WEEKS = timedelta(days=15)
DEFAULT_MAX_DURATION = TWO_WEEKS
DEFAULT_MAX_DURATION_IN_WEEKS = TWO_WEEKS
LEASE_TIME_FORMAT = "%Y-%m-%d %H:%M:%S %z"
DEFAULT_LEASE_IN_HOURS = 24
LONG_LIVED_SLICE_TIME_WEEKS = timedelta(weeks=26)
Expand Down
5 changes: 0 additions & 5 deletions fabric_cf/actor/core/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ def __init__(self, *, identity: AuthToken = None, clock: ActorClock = None):
thread_name_prefix=self.__class__.__name__)
self.pluggable_registry = PluggableRegistry()

self.combined_broker_model = None
self.combined_broker_model_graph_id = None

def __getstate__(self):
state = self.__dict__.copy()
del state['recovered']
Expand All @@ -116,8 +113,6 @@ def __getstate__(self):
if hasattr(self, 'pluggable_registry'):
del state['pluggable_registry']

del state['combined_broker_model']

return state

def __setstate__(self, state):
Expand Down
16 changes: 16 additions & 0 deletions fabric_cf/actor/core/core/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#
#
# Author: Komal Thareja (kthare10@renci.org)
import enum
from enum import Enum

from fabric_cf.actor.boot.configuration import ActorConfig
from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin
from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation
Expand All @@ -36,6 +39,19 @@
from fabric_cf.actor.core.kernel.resource_set import ResourceSet


class AllocationAlgorithm(Enum):
FirstFit = enum.auto()
BestFit = enum.auto()
WorstFit = enum.auto()
Random = enum.auto()

def __repr__(self):
return self.name

def __str__(self):
return self.name


class Policy(ABCPolicy):
"""
Base class for all policy implementations.
Expand Down
13 changes: 10 additions & 3 deletions fabric_cf/actor/core/manage/actor_management_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,17 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro:

return result

def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int],
component: str = None, bdf: str = None, start: datetime = None,
end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]:
return self.db.get_components(node_id=node_id, rsv_type=rsv_type, states=states,
component=component, bdf=bdf, start=start, end=end, excludes=excludes)

def get_reservations(self, *, caller: AuthToken, states: List[int] = None,
slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None,
email: str = None, rid_list: List[str] = None, type: str = None,
site: str = None, node_id: str = None, host: str = None,
ip_subnet: str = None, full: bool = False) -> ResultReservationAvro:
site: str = None, node_id: str = None, host: str = None, ip_subnet: str = None,
full: bool = False, start: datetime = None, end: datetime = None) -> ResultReservationAvro:
result = ResultReservationAvro()
result.status = ResultAvro()

Expand All @@ -452,7 +458,8 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None,
else:
res_list = self.db.get_reservations(slice_id=slice_id, rid=rid, email=email,
states=states, rsv_type=rsv_type, site=site,
graph_node_id=node_id, host=host, ip_subnet=ip_subnet)
graph_node_id=node_id, host=host, ip_subnet=ip_subnet,
start=start, end=end)
except Exception as e:
self.logger.error("getReservations:db access {}".format(e))
result.status.set_code(ErrorCodes.ErrorDatabaseError.value)
Expand Down
4 changes: 3 additions & 1 deletion fabric_cf/actor/core/manage/kafka/kafka_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Author: Komal Thareja (kthare10@renci.org)
from __future__ import annotations

from datetime import datetime
from typing import List

from fabric_mb.message_bus.messages.close_delegations_avro import CloseDelegationsAvro
Expand Down Expand Up @@ -132,7 +133,8 @@ def delete_slice(self, *, slice_id: ID) -> bool:
def get_reservations(self, *, states: List[int] = None, slice_id: ID = None,
rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None,
type: str = None, site: str = None, node_id: str = None,
host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]:
host: str = None, ip_subnet: str = None, full: bool = False,
start: datetime = None, end: datetime = None) -> List[ReservationMng]:
request = GetReservationsRequestAvro()
request = self.fill_request_by_id_message(request=request, slice_id=slice_id,
states=states, email=email, rid=rid,
Expand Down
19 changes: 15 additions & 4 deletions fabric_cf/actor/core/manage/local/local_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from __future__ import annotations

import traceback
from datetime import datetime
from typing import TYPE_CHECKING, List, Tuple, Dict

from fabric_mb.message_bus.messages.delegation_avro import DelegationAvro
Expand All @@ -44,12 +45,11 @@
from fabric_mb.message_bus.messages.reservation_mng import ReservationMng
from fabric_mb.message_bus.messages.reservation_state_avro import ReservationStateAvro
from fabric_mb.message_bus.messages.slice_avro import SliceAvro
from fabric_cf.actor.core.manage.management_object import ManagementObject
from fabric_cf.actor.security.auth_token import AuthToken


class LocalActor(LocalProxy, ABCMgmtActor):
def __init__(self, *, manager: ManagementObject, auth: AuthToken):
def __init__(self, *, manager: ActorManagementObject, auth: AuthToken):
super().__init__(manager=manager, auth=auth)

if not isinstance(manager, ActorManagementObject):
Expand Down Expand Up @@ -111,13 +111,14 @@ def remove_slice(self, *, slice_id: ID) -> bool:
def get_reservations(self, *, states: List[int] = None, slice_id: ID = None,
rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None,
type: str = None, site: str = None, node_id: str = None,
host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]:
host: str = None, ip_subnet: str = None, full: bool = False,
start: datetime = None, end: datetime = None) -> List[ReservationMng]:
self.clear_last()
try:
result = self.manager.get_reservations(caller=self.auth, states=states, slice_id=slice_id, rid=rid,
oidc_claim_sub=oidc_claim_sub, email=email, rid_list=rid_list,
type=type, site=site, node_id=node_id, host=host,
ip_subnet=ip_subnet, full=full)
ip_subnet=ip_subnet, full=full, start=start, end=end)
self.last_status = result.status

if result.status.get_code() == 0:
Expand All @@ -126,6 +127,16 @@ def get_reservations(self, *, states: List[int] = None, slice_id: ID = None,
except Exception as e:
self.on_exception(e=e, traceback_str=traceback.format_exc())

def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int],
component: str = None, bdf: str = None, start: datetime = None,
end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]:
try:
return self.manager.get_components(node_id=node_id, rsv_type=rsv_type, states=states,
component=component, bdf=bdf, start=start,
end=end, excludes=excludes)
except Exception as e:
self.on_exception(e=e, traceback_str=traceback.format_exc())

def get_sites(self, *, site: str) -> List[SiteAvro] or None:
self.clear_last()
try:
Expand Down
1 change: 1 addition & 0 deletions fabric_cf/actor/core/manage/management_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ManagementUtils:
@staticmethod
def update_slice(*, slice_obj: ABCSlice, slice_mng: SliceAvro) -> ABCSlice:
slice_obj.set_graph_id(graph_id=slice_mng.get_graph_id())
slice_obj.set_lease_start(lease_start=slice_mng.get_lease_start())
slice_obj.set_lease_end(lease_end=slice_mng.get_lease_end())
slice_obj.set_config_properties(value=slice_mng.get_config_properties())
return slice_obj
Expand Down
49 changes: 19 additions & 30 deletions fabric_cf/actor/core/policy/broker_simpler_units_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from fim.slivers.attached_components import ComponentSliver, ComponentType
from fim.slivers.base_sliver import BaseSliver
from fim.slivers.capacities_labels import Labels, Capacities
from fim.slivers.interface_info import InterfaceSliver, InterfaceType
from fim.slivers.interface_info import InterfaceType
from fim.slivers.network_node import NodeSliver, NodeType
from fim.slivers.network_service import NetworkServiceSliver, ServiceType, NSLayer
from fim.slivers.path_info import Path
Expand All @@ -50,6 +50,7 @@
from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin
from fabric_cf.actor.core.common.constants import Constants
from fabric_cf.actor.core.container.maintenance import Maintenance
from fabric_cf.actor.core.core.policy import AllocationAlgorithm
from fabric_cf.actor.core.delegation.resource_ticket import ResourceTicketFactory
from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationOperation
Expand All @@ -75,19 +76,6 @@
from fabric_cf.actor.core.apis.abc_broker_mixin import ABCBrokerMixin


class BrokerAllocationAlgorithm(Enum):
FirstFit = enum.auto()
BestFit = enum.auto()
WorstFit = enum.auto()
Random = enum.auto()

def __repr__(self):
return self.name

def __str__(self):
return self.name


class BrokerSimplerUnitsPolicy(BrokerCalendarPolicy):
"""
BrokerSimplerUnitsPolicy is a simple implementation of the broker policy interface.
Expand Down Expand Up @@ -669,8 +657,9 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod
@return tuple containing delegation id, sliver, error message if any
"""
delegation_id = None
node_id_list = self.__candidate_nodes(sliver=sliver)
if self.get_algorithm_type(site=sliver.site) == BrokerAllocationAlgorithm.Random:
node_id_list = FimHelper.candidate_nodes(combined_broker_model=self.combined_broker_model,
sliver=sliver)
if self.get_algorithm_type(site=sliver.site) == AllocationAlgorithm.Random:
random.shuffle(node_id_list)
else:
# Reshuffle Nodes based on CPU Threshold only for VMs when no specific host is specified
Expand Down Expand Up @@ -857,8 +846,8 @@ def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver:
device_name = owner_switch.get_name()

if device_name == Constants.AL2S:
delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations=
net_cp.get_label_delegations())
delegation_id, delegated_label = FimHelper.get_delegations(delegations=
net_cp.get_label_delegations())
device_name = delegated_label.device_name
local_name = delegated_label.local_name

Expand Down Expand Up @@ -938,11 +927,11 @@ def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver:
owner_mpls_ns = ns
break
if owner_ns and ServiceType.MPLS == owner_ns.get_type():
delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations=
owner_switch.get_label_delegations())
delegation_id, delegated_label = FimHelper.get_delegations(delegations=
owner_switch.get_label_delegations())
else:
delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations=
owner_ns.get_label_delegations())
delegation_id, delegated_label = FimHelper.get_delegations(delegations=
owner_ns.get_label_delegations())

# Set the Subnet and gateway from the Owner Switch (a)
existing_reservations = self.get_existing_reservations(node_id=owner_ns_id,
Expand Down Expand Up @@ -1718,17 +1707,17 @@ def unmerge_adm(self, *, graph_id: str):
self.combined_broker_model.rollback(graph_id=snapshot_graph_id)
raise e

def get_algorithm_type(self, site: str) -> BrokerAllocationAlgorithm:
def get_algorithm_type(self, site: str) -> AllocationAlgorithm:
if self.properties is not None:
algorithms = self.properties.get(Constants.ALGORITHM, None)
random_algo = algorithms.get(str(BrokerAllocationAlgorithm.Random))
random_algo = algorithms.get(str(AllocationAlgorithm.Random))
if random_algo and random_algo.get('enabled') and random_algo.get('sites') and \
site in random_algo.get('sites'):
return BrokerAllocationAlgorithm.Random
first_fit_algo = algorithms.get(BrokerAllocationAlgorithm.Random.name)
return AllocationAlgorithm.Random
first_fit_algo = algorithms.get(AllocationAlgorithm.Random.name)
if first_fit_algo and first_fit_algo.get('enabled'):
return BrokerAllocationAlgorithm.FirstFit
return BrokerAllocationAlgorithm.FirstFit
return AllocationAlgorithm.FirstFit
return AllocationAlgorithm.FirstFit

def get_core_capacity_threshold(self) -> Tuple[bool, int]:
if self.properties is not None:
Expand All @@ -1754,8 +1743,8 @@ def get_node_capacities(self, node_id: str, node_id_to_reservations: dict,
start=term.get_start_time(),
end=term.get_end_time())

delegation_id, delegated_capacity = NetworkNodeInventory.get_delegations(
lab_cap_delegations=graph_node.get_capacity_delegations())
delegation_id, delegated_capacity = FimHelper.get_delegations(
delegations=graph_node.get_capacity_delegations())

allocated_capacity = Capacities()

Expand Down
Loading