Skip to content

Commit

Permalink
Merge pull request #374 from fabric-testbed/371-advance-scheduling-cr…
Browse files Browse the repository at this point in the history
…eate

371 advance scheduling create
  • Loading branch information
kthare10 authored May 9, 2024
2 parents 98d7ab1 + d28e58c commit 13037b5
Show file tree
Hide file tree
Showing 26 changed files with 528 additions and 145 deletions.
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/apis/abc_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, pr

@abstractmethod
def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
bdf: str = None) -> Dict[str, List[str]]:
bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]:
"""
Retrieves the components.
Expand Down
14 changes: 8 additions & 6 deletions fabric_cf/actor/core/core/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,14 @@ def recover(self):
client_slices = self.plugin.get_database().get_slices(slc_type=[SliceTypes.ClientSlice,
SliceTypes.BrokerClientSlice],
states=[SliceState.Configuring.value,
SliceState.Nascent.value,
SliceState.StableOK.value,
SliceState.StableError.value,
SliceState.Modifying.value,
SliceState.ModifyOK.value,
SliceState.ModifyError.value])
SliceState.Nascent.value,
SliceState.StableOK.value,
SliceState.StableError.value,
SliceState.Modifying.value,
SliceState.ModifyOK.value,
SliceState.ModifyError.value,
SliceState.AllocatedOK.value,
SliceState.AllocatedError.value])
self.logger.debug("Found {} client slices".format(len(client_slices)))
self.recover_slices(slices=client_slices)
self.logger.debug("Recovery of client slices complete")
Expand Down
5 changes: 5 additions & 0 deletions fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Author: Komal Thareja (kthare10@renci.org)
from __future__ import annotations

import datetime
import json
import re
import threading
Expand Down Expand Up @@ -422,6 +423,10 @@ def approve_redeem(self):
@return true if approved; false otherwise
"""
approved = True
now = datetime.datetime.now(datetime.timezone.utc)
if self.requested_term and self.requested_term.get_start_time() > now:
self.logger.debug(f"Future Reservation : {self}!")
return False

for pred_state in self.redeem_predecessors.values():
if pred_state.get_reservation() is None or \
Expand Down
22 changes: 19 additions & 3 deletions fabric_cf/actor/core/kernel/slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def is_stable_error(self) -> bool:
def is_stable(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.StableError or slice_state == SliceState.StableOk:
if slice_state in [SliceState.StableError, SliceState.StableOK]:
return True

return False
Expand All @@ -313,15 +313,31 @@ def is_modify_error(self) -> bool:
def is_modified(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.ModifyError or slice_state == SliceState.ModifyOK:
if slice_state in [SliceState.ModifyError, SliceState.ModifyOK]:
return True

return False

def is_allocated_error(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.AllocatedError:
return True

return False

def is_allocated(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
return True

return False

def is_dead_or_closing(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.Dead or slice_state == SliceState.Closing:
if slice_state in [SliceState.Dead, SliceState.Closing]:
return True

return False
Expand Down
82 changes: 72 additions & 10 deletions fabric_cf/actor/core/kernel/slice_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class SliceState(Enum):
Modifying = enum.auto()
ModifyError = enum.auto()
ModifyOK = enum.auto()
AllocatedError = enum.auto()
AllocatedOK = enum.auto()
All = enum.auto() # used only for querying

def __str__(self):
Expand Down Expand Up @@ -102,6 +104,10 @@ def translate(state_name: str):
return SliceState.Closing
elif state_name.lower() == SliceState.Dead.name.lower():
return SliceState.Dead
elif state_name.lower() == SliceState.AllocatedOK.name.lower():
return SliceState.Closing
elif state_name.lower() == SliceState.AllocatedError.name.lower():
return SliceState.Dead
else:
return SliceState.All

Expand All @@ -117,6 +123,12 @@ def is_stable(*, state) -> bool:
return True
return False

@staticmethod
def is_allocated(*, state) -> bool:
if state == SliceState.AllocatedOK or state == SliceState.AllocatedError:
return True
return False

@staticmethod
def is_modified(*, state) -> bool:
if state == SliceState.ModifyOK or state == SliceState.ModifyError:
Expand Down Expand Up @@ -183,18 +195,20 @@ def has_state_other_than(self, *states) -> bool:
class SliceStateMachine:
CREATE = SliceOperation(SliceCommand.Create, SliceState.Nascent)

MODIFY = SliceOperation(SliceCommand.Modify, SliceState.StableOK, SliceState.StableError, SliceState.Configuring)
MODIFY = SliceOperation(SliceCommand.Modify, SliceState.StableOK, SliceState.StableError, SliceState.Configuring,
SliceState.AllocatedOK, SliceState.AllocatedError)

MODIFY_ACCEPT = SliceOperation(SliceCommand.ModifyAccept, SliceState.ModifyOK, SliceState.ModifyError,
SliceState.Modifying)
SliceState.Modifying, SliceState.AllocatedOK, SliceState.AllocatedError)

DELETE = SliceOperation(SliceCommand.Delete, SliceState.Nascent, SliceState.StableOK, SliceState.StableError,
SliceState.Configuring, SliceState.Modifying, SliceState.ModifyOK, SliceState.ModifyError,
SliceState.Dead)
SliceState.Dead, SliceState.AllocatedOK, SliceState.AllocatedError)

REEVALUATE = SliceOperation(SliceCommand.Reevaluate, SliceState.Nascent, SliceState.StableOK,
SliceState.StableError, SliceState.Configuring, SliceState.Dead, SliceState.Closing,
SliceState.Modifying, SliceState.ModifyError, SliceState.ModifyOK)
SliceState.Modifying, SliceState.ModifyError, SliceState.ModifyOK,
SliceState.AllocatedError, SliceState.AllocatedOK)

def __init__(self, *, slice_id: ID):
self.slice_guid = slice_id
Expand Down Expand Up @@ -238,9 +252,16 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati

elif operation.command == SliceCommand.ModifyAccept:
if self.state == SliceState.ModifyError:
self.state = SliceState.StableError
if self.last_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
self.state = SliceState.AllocatedError
else:
self.state = SliceState.StableError

elif self.state == SliceState.ModifyOK:
self.state = SliceState.StableOK
if self.last_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
self.state = SliceState.AllocatedOK
else:
self.state = SliceState.StableOK

elif operation.command == SliceCommand.Delete:
if self.state != SliceState.Dead:
Expand All @@ -266,7 +287,36 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati
if not has_error and r.get_error_message() is not None and len(r.get_error_message()) > 0:
has_error = True

if self.state == SliceState.Nascent or self.state == SliceState.Configuring:
if self.state in [SliceState.Nascent, SliceState.Configuring]:
if not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
self.state = SliceState.StableOK
else:
self.state = SliceState.StableError

if (not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.StableError

if not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
self.state = SliceState.AllocatedOK
else:
self.state = SliceState.AllocatedError

if (not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.AllocatedError

if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Closing

if self.state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
if not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
Expand Down Expand Up @@ -296,12 +346,24 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.ModifyError

if not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Closed,
ReservationStates.CloseFail):
if has_error:
self.state = SliceState.ModifyError
else:
self.state = SliceState.ModifyOK

if (not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.ModifyError

if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Closing

elif self.state == SliceState.StableError or self.state == SliceState.StableOK or \
self.state == SliceState.ModifyError or self.state == SliceState.ModifyOK:
elif self.state in [SliceState.StableError, SliceState.StableOK, SliceState.ModifyError,
SliceState.ModifyOK, SliceState.AllocatedError, SliceState.AllocatedOK]:
if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Dead
Expand All @@ -326,4 +388,4 @@ def get_state(self) -> SliceState:
return self.state

def clear(self):
self.state = SliceState.Nascent
self.state = SliceState.Nascent
10 changes: 2 additions & 8 deletions fabric_cf/actor/core/manage/actor_management_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from typing import TYPE_CHECKING, List, Dict, Tuple

from fabric_cf.actor.fim.fim_helper import FimHelper
from fabric_mb.message_bus.messages.poa_avro import PoaAvro
from fabric_mb.message_bus.messages.poa_info_avro import PoaInfoAvro
from fabric_mb.message_bus.messages.reservation_mng import ReservationMng
from fabric_mb.message_bus.messages.result_delegation_avro import ResultDelegationAvro
from fabric_mb.message_bus.messages.result_poa_avro import ResultPoaAvro
Expand All @@ -43,7 +41,6 @@
from fabric_mb.message_bus.messages.result_slice_avro import ResultSliceAvro
from fabric_mb.message_bus.messages.slice_avro import SliceAvro
from fim.user import GraphFormat
from fim.user.topology import AdvertizedTopology

from fabric_cf.actor.core.apis.abc_actor_runnable import ABCActorRunnable
from fabric_cf.actor.core.common.constants import Constants, ErrorCodes
Expand Down Expand Up @@ -96,7 +93,6 @@ def save(self) -> dict:
return properties

def recover(self):
actor_name = None
if Constants.PROPERTY_ACTOR_NAME in self.serial:
actor_name = self.serial[Constants.PROPERTY_ACTOR_NAME]
else:
Expand Down Expand Up @@ -135,7 +131,6 @@ def make_local_db_object(self, *, actor: ABCActorMixin):
def set_actor(self, *, actor: ABCActorMixin):
if self.actor is None:
self.actor = actor
#self.db = actor.get_plugin().get_database()
self.logger = actor.get_logger()
self.id = actor.get_guid()
self.make_local_db_object(actor=actor)
Expand Down Expand Up @@ -195,7 +190,7 @@ def add_slice(self, *, slice_obj: SliceAvro, caller: AuthToken) -> ResultStringA
slice_obj_new.set_graph_id(graph_id=slice_obj.graph_id)
slice_obj_new.set_config_properties(value=slice_obj.get_config_properties())
slice_obj_new.set_lease_end(lease_end=slice_obj.get_lease_end())
slice_obj_new.set_lease_start(lease_start=datetime.now(timezone.utc))
slice_obj_new.set_lease_start(lease_start=slice_obj.get_lease_start())

if slice_obj.get_inventory():
slice_obj_new.set_inventory(value=True)
Expand Down Expand Up @@ -870,8 +865,7 @@ def build_broker_query_model(self, level_0_broker_query_model: str, level: int,
start: datetime = None, end: datetime = None, includes: str = None,
excludes: str = None) -> str:
try:
db = self.actor.get_plugin().get_database()
return FimHelper.build_broker_query_model(db=db, level_0_broker_query_model=level_0_broker_query_model,
return FimHelper.build_broker_query_model(level_0_broker_query_model=level_0_broker_query_model,
level=level, graph_format=graph_format, start=start,
end=end, includes=includes, excludes=excludes)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/manage/local/local_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,4 @@ def poa(self, *, poa: PoaAvro) -> bool:
except Exception as e:
self.on_exception(e=e, traceback_str=traceback.format_exc())

return False
return False
15 changes: 11 additions & 4 deletions fabric_cf/actor/core/plugins/db/actor_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ def add_reservation(self, *, reservation: ABCReservationMixin):
if node_id and comp_id and bdf:
components.append((node_id, comp_id, bdf))

term = reservation.get_term()

self.db.add_reservation(slc_guid=str(reservation.get_slice_id()),
rsv_resid=str(reservation.get_reservation_id()),
rsv_category=reservation.get_category().value,
Expand All @@ -271,7 +273,9 @@ def add_reservation(self, *, reservation: ABCReservationMixin):
properties=properties,
rsv_graph_node_id=reservation.get_graph_node_id(),
oidc_claim_sub=oidc_claim_sub, email=email, site=site, rsv_type=rsv_type,
components=components)
components=components,
lease_start=term.get_start_time() if term else None,
lease_end=term.get_end_time() if term else None)
self.logger.debug(
"Reservation {} added to slice {}".format(reservation.get_reservation_id(), reservation.get_slice()))
finally:
Expand Down Expand Up @@ -308,6 +312,7 @@ def update_reservation(self, *, reservation: ABCReservationMixin):
if node_id and comp_id and bdf:
components.append((node_id, comp_id, bdf))

term = reservation.get_term()
begin = time.time()
properties = pickle.dumps(reservation)
diff = int(time.time() - begin)
Expand All @@ -322,7 +327,9 @@ def update_reservation(self, *, reservation: ABCReservationMixin):
rsv_joining=reservation.get_join_state().value,
properties=properties,
rsv_graph_node_id=reservation.get_graph_node_id(),
site=site, rsv_type=rsv_type, components=components)
site=site, rsv_type=rsv_type, components=components,
lease_start=term.get_start_time() if term else None,
lease_end=term.get_end_time() if term else None)
diff = int(time.time() - begin)
if diff > 0:
self.logger.info(f"DB TIME: {diff}")
Expand Down Expand Up @@ -459,10 +466,10 @@ def get_authority_reservations(self) -> List[ABCReservationMixin]:
return result

def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
bdf: str = None) -> Dict[str, List[str]]:
bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]:
try:
return self.db.get_components(node_id=node_id, states=states, component=component, bdf=bdf,
rsv_type=rsv_type)
rsv_type=rsv_type, start=start, end=end)
except Exception as e:
self.logger.error(e)
finally:
Expand Down
Loading

0 comments on commit 13037b5

Please sign in to comment.