diff --git a/src/lbaf/Execution/lbsAlgorithmBase.py b/src/lbaf/Execution/lbsAlgorithmBase.py index 6778ff268..a0cb74a6e 100644 --- a/src/lbaf/Execution/lbsAlgorithmBase.py +++ b/src/lbaf/Execution/lbsAlgorithmBase.py @@ -115,7 +115,7 @@ def report_final_mapping(self, logger): def execute(self, phases, distributions, statistics, a_min_max): """ Excecute balancing algorithm on Phase instance phases: list of Phase instances - distributions: dictionary of time-varying variables + distributions: dictionary of load-varying variables statistics: dictionary of statistics a_min_max: possibly empty list of optimal arrangements""" diff --git a/src/lbaf/Execution/lbsBruteForceAlgorithm.py b/src/lbaf/Execution/lbsBruteForceAlgorithm.py index f2fa7cf38..bbad35ea5 100644 --- a/src/lbaf/Execution/lbsBruteForceAlgorithm.py +++ b/src/lbaf/Execution/lbsBruteForceAlgorithm.py @@ -43,7 +43,7 @@ def compute_arrangement_works(self, objects: tuple, arrangement: tuple) -> dict: for rank, rank_object_ids in ranks.items(): # Compute load component for current rank values = { - "load": sum([objects[i].get("time") for i in rank_object_ids])} + "load": sum([objects[i].get("load") for i in rank_object_ids])} # Compute received communication volume v = 0.0 @@ -88,7 +88,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _): entry = { "id": o.get_id(), "rank": rank, - "time": o.get_time(), + "load": o.get_load(), "to": {}, "from": {}} comm = o.get_communicator() @@ -153,7 +153,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _): object_id = objects[i]["id"] for o in p_src.get_objects(): if o.get_id() == object_id: - self.__logger.debug(f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()}") + self.__logger.debug(f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()}") p_src.remove_migratable_object(o, p_dst) p_dst.add_migratable_object(o) o.set_rank_id(p_dst.get_id()) diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index b3b4d97d2..e150a743a 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -54,8 +54,8 @@ def __init__(self, work_model, parameters: dict, lgr: Logger): self.__strategy_mapped = { "arbitrary": self.arbitrary, "element_id": self.element_id, - "decreasing_times": self.decreasing_times, - "increasing_times": self.increasing_times, + "decreasing_loads": self.decreasing_loads, + "increasing_loads": self.increasing_loads, "increasing_connectivity": self.increasing_connectivity, "fewest_migrations": self.fewest_migrations, "small_objects": self.small_objects} @@ -280,7 +280,7 @@ def transfer_stage(self): self.__logger.debug(f"Transferring {len(object_list)} object(s) at once") for o in object_list: self.__logger.debug( - f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()} " + f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()} " f"(criterion: {c_dst})") p_src.remove_migratable_object(o, p_dst) p_dst.add_migratable_object(o) @@ -367,14 +367,14 @@ def element_id(objects: set, _): return sorted(objects, key=lambda x: x.get_id()) @staticmethod - def decreasing_times(objects: set, _): - """ Order objects by decreasing object times.""" - return sorted(objects, key=lambda x: -x.get_time()) + def decreasing_loads(objects: set, _): + """ Order objects by decreasing object loads.""" + return sorted(objects, key=lambda x: -x.get_load()) @staticmethod - def increasing_times(objects: set, _): - """ Order objects by increasing object times.""" - return sorted(objects, key=lambda x: x.get_time()) + def increasing_loads(objects: set, _): + """ Order objects by increasing object loads.""" + return sorted(objects, key=lambda x: x.get_load()) @staticmethod def increasing_connectivity(objects: set, src_id): @@ -399,14 +399,14 @@ def increasing_connectivity(objects: set, src_id): @staticmethod def sorted_ascending(objects: Union[set, list]): - return sorted(objects, key=lambda x: x.get_time()) + return sorted(objects, key=lambda x: x.get_load()) @staticmethod def sorted_descending(objects: Union[set, list]): - return sorted(objects, key=lambda x: -x.get_time()) + return sorted(objects, key=lambda x: -x.get_load()) def load_excess(self, objects: set): - rank_load = sum([obj.get_time() for obj in objects]) + rank_load = sum([obj.get_load() for obj in objects]) return rank_load - self.__average_load def fewest_migrations(self, objects: set, _): @@ -415,8 +415,8 @@ def fewest_migrations(self, objects: set, _): Sort largest to the smallest if <= load_excess Sort smallest to the largest if > load_excess""" load_excess = self.load_excess(objects) - lt_load_excess = [obj for obj in objects if obj.get_time() <= load_excess] - get_load_excess = [obj for obj in objects if obj.get_time() > load_excess] + lt_load_excess = [obj for obj in objects if obj.get_load() <= load_excess] + get_load_excess = [obj for obj in objects if obj.get_load() > load_excess] return self.sorted_descending(lt_load_excess) + self.sorted_ascending(get_load_excess) def small_objects(self, objects: set, _): @@ -426,6 +426,6 @@ def small_objects(self, objects: set, _): Sort smallest to the largest if > load_excess""" load_excess = self.load_excess(objects) sorted_objects = self.sorted_ascending(objects) - accumulated_times = list(accumulate(obj.get_time() for obj in sorted_objects)) - idx = bisect(accumulated_times, load_excess) + 1 + accumulated_loads = list(accumulate(obj.get_load() for obj in sorted_objects)) + idx = bisect(accumulated_loads, load_excess) + 1 return self.sorted_descending(sorted_objects[:idx]) + self.sorted_ascending(sorted_objects[idx:]) diff --git a/src/lbaf/Execution/lbsTemperedCriterion.py b/src/lbaf/Execution/lbsTemperedCriterion.py index a335bdcef..4782b537b 100644 --- a/src/lbaf/Execution/lbsTemperedCriterion.py +++ b/src/lbaf/Execution/lbsTemperedCriterion.py @@ -38,7 +38,7 @@ def compute(self, objects: list, p_src: Rank, p_dst: Rank) -> float: w_max_0 = max(w_src_0, w_dst_0) # Update loads in proposed new arrangement - object_loads = sum([o.get_time() for o in objects]) + object_loads = sum([o.get_load() for o in objects]) values_src["load"] -= object_loads values_dst["load"] += object_loads diff --git a/src/lbaf/IO/configurationValidator.py b/src/lbaf/IO/configurationValidator.py index d1181451d..91b63919b 100644 --- a/src/lbaf/IO/configurationValidator.py +++ b/src/lbaf/IO/configurationValidator.py @@ -10,8 +10,8 @@ ALLOWED_STRATEGIES = ( "arbitrary", "element_id", - "increasing_times", - "decreasing_times", + "increasing_loads", + "decreasing_loads", "increasing_connectivity", "fewest_migrations", "small_objects") @@ -22,7 +22,7 @@ "PhaseStepper") ALLOWED_CRITERIA = ("Tempered", "StrictLocalizer") ALLOWED_LOGGING_LEVELS = ("info", "debug", "warning", "error") -ALLOWED_TIME_VOLUME_SAMPLER = ("uniform", "lognormal") +ALLOWED_LOAD_VOLUME_SAMPLER = ("uniform", "lognormal") ALLOWED_TERMINAL_BACKGROUND = ("light", "dark") @@ -102,12 +102,12 @@ def __init__(self, config_to_validate: dict, logger: Logger): "n_mapped_ranks": And(int, lambda x: x >= 0, error="Should be of type 'int' and >= 0"), "communication_degree": int, - "time_sampler": { + "load_sampler": { "name": And( str, Use(str.lower), - lambda a: a in ALLOWED_TIME_VOLUME_SAMPLER, - error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"), + lambda a: a in ALLOWED_LOAD_VOLUME_SAMPLER, + error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"), "parameters": And( [float], lambda s: len(s) == 2, @@ -116,8 +116,8 @@ def __init__(self, config_to_validate: dict, logger: Logger): "name": And( str, Use(str.lower), - lambda b: b in ALLOWED_TIME_VOLUME_SAMPLER, - error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"), + lambda b: b in ALLOWED_LOAD_VOLUME_SAMPLER, + error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"), "parameters": And( [float], lambda s: len(s) == 2, diff --git a/src/lbaf/IO/lbsMeshBasedVisualizer.py b/src/lbaf/IO/lbsMeshBasedVisualizer.py index 929e18b64..042f727e2 100644 --- a/src/lbaf/IO/lbsMeshBasedVisualizer.py +++ b/src/lbaf/IO/lbsMeshBasedVisualizer.py @@ -67,17 +67,17 @@ def __init__(self, logger: Logger, phases: list, grid_size: list, object_jitter= # Initialize maximum edge volume self.__max_volume = 0.0 - # Compute object time range - self.__time_range = [math.inf, 0.0] + # Compute object load range + self.__load_range = [math.inf, 0.0] for p in self.__phases: for r in p.get_ranks(): for o in r.get_objects(): - # Update time range when necessary - time = o.get_time() - if time > self.__time_range[1]: - self.__time_range[1] = time - if time < self.__time_range[0]: - self.__time_range[0] = time + # Update load range when necessary + load = o.get_load() + if load > self.__load_range[1]: + self.__load_range[1] = load + if load < self.__load_range[0]: + self.__load_range[0] = load # Assemble file and path names from constructor parameters self.__rank_file_name = f"{output_file_stem}_rank_view.e" @@ -215,7 +215,7 @@ def create_object_mesh(self, phase: Phase, object_mapping: set): self.__logger.info( f"Creating object mesh with {n_o} points and {n_e} edges") - # Create point array for object times + # Create point array for object loads t_arr = vtk.vtkDoubleArray() t_arr.SetName("Load") t_arr.SetNumberOfTuples(n_o) @@ -275,8 +275,8 @@ def create_object_mesh(self, phase: Phase, object_mapping: set): (0.0, 0.0, 0.0))[d] + c) * o_resolution for d, c in enumerate(self.global_id_to_cartesian( i, rank_size))]) - time = o.get_time() - t_arr.SetTuple1(point_index, time) + load = o.get_load() + t_arr.SetTuple1(point_index, load) b_arr.SetTuple1(point_index, m) # Update sent volumes @@ -461,7 +461,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g renderer.AddActor(edge_actor) renderer.AddActor2D(volume_actor) - # Compute square root of object times + # Compute square root of object loads sqrtT = vtk.vtkArrayCalculator() sqrtT.SetInputData(object_mesh) sqrtT.AddScalarArrayName("Load") @@ -486,7 +486,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g thresh_out.GetPointData().SetActiveScalars( sqrtT_str) - # Glyph by square root of object times + # Glyph by square root of object loads glyph = vtk.vtkGlyphSource2D() getattr(glyph, f"SetGlyphTypeTo{v}")() glyph.SetResolution(32) @@ -513,16 +513,16 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g glyph_mapper.SetInputConnection(trans.GetOutputPort()) glyph_mapper.SetLookupTable( self.create_color_transfer_function( - self.__time_range, "blue_to_red")) - glyph_mapper.SetScalarRange(self.__time_range) + self.__load_range, "blue_to_red")) + glyph_mapper.SetScalarRange(self.__load_range) glyph_actor = vtk.vtkActor() glyph_actor.SetMapper(glyph_mapper) renderer.AddActor(glyph_actor) - # Create and add unique scalar bar for object time - time_actor = self.create_scalar_bar_actor( + # Create and add unique scalar bar for object load + load_actor = self.create_scalar_bar_actor( glyph_mapper, "Object Load", 0.55, 0.05) - renderer.AddActor2D(time_actor) + renderer.AddActor2D(load_actor) # Create text actor to indicate iteration text_actor = vtk.vtkTextActor() @@ -615,7 +615,7 @@ def generate(self, gen_meshes, gen_mulmed): f"\tcommunication edges width: {edge_width:.2g}") glyph_factor = self.__grid_resolution / ( (self.__max_o_per_dim + 1) - * math.sqrt(self.__time_range[1])) + * math.sqrt(self.__load_range[1])) self.__logger.info( f"\tobject glyphs scaling: {glyph_factor:.2g}") diff --git a/src/lbaf/IO/lbsVTDataReader.py b/src/lbaf/IO/lbsVTDataReader.py index fcb839230..65ab3dec1 100644 --- a/src/lbaf/IO/lbsVTDataReader.py +++ b/src/lbaf/IO/lbsVTDataReader.py @@ -237,8 +237,8 @@ def json_reader(self, returned_dict: dict, phase_id: int, node_id: int) -> tuple phase_rank := Rank(node_id, logger=self.__logger)) # Iterate over tasks - for task in p["tasks"]: - task_time = task.get("time") + for task in phase["tasks"]: + task_load = task.get("time") entity = task.get("entity") task_object_id = entity.get("id") task_used_defined = task.get("user_defined") diff --git a/src/lbaf/IO/lbsVTDataWriter.py b/src/lbaf/IO/lbsVTDataWriter.py index b0de42753..e77fa1485 100644 --- a/src/lbaf/IO/lbsVTDataWriter.py +++ b/src/lbaf/IO/lbsVTDataWriter.py @@ -57,21 +57,21 @@ def json_writer(self, rank: Rank) -> str: for o in rank.get_objects(): # Write object to file and increment count try: - # writer.writerow([o.get_rank_id(), o.get_id(), o.get_time()]) + # writer.writerow([o.get_rank_id(), o.get_id(), o.get_load()]) rank_id = o.get_rank_id() obj_id = o.get_id() - obj_time = o.get_time() + obj_load = o.get_load() if isinstance(temp_dict.get(rank_id, None), list): temp_dict[rank_id].append({ "rank_id": rank_id, "obj_id": obj_id, - "obj_time": obj_time}) + "obj_load": obj_load}) else: temp_dict[rank_id] = list() temp_dict[rank_id].append({ "rank_id": rank_id, "obj_id": obj_id, - "obj_time": obj_time}) + "obj_load": obj_load}) except: n_u += 1 diff --git a/src/lbaf/Model/lbsLoadOnlyWorkModel.py b/src/lbaf/Model/lbsLoadOnlyWorkModel.py index d12700866..ff35c30a3 100644 --- a/src/lbaf/Model/lbsLoadOnlyWorkModel.py +++ b/src/lbaf/Model/lbsLoadOnlyWorkModel.py @@ -21,7 +21,7 @@ def __init__(self, _, lgr: Logger): self.__logger.info("Instantiated concrete work model") def compute(self, rank: Rank): - """ A work model summing all object times on given rank + """ A work model summing all object loads on given rank """ # Return total load on this rank return rank.get_load() diff --git a/src/lbaf/Model/lbsObject.py b/src/lbaf/Model/lbsObject.py index b176dc06e..a437079e3 100644 --- a/src/lbaf/Model/lbsObject.py +++ b/src/lbaf/Model/lbsObject.py @@ -5,64 +5,64 @@ class Object: - """ A class representing an object with time and communicator + """ A class representing an object with load and communicator """ - def __init__(self, i: int, t: float, p: int = None, c: ObjectCommunicator = None, user_defined: dict = None, - subphases: list = None): + def __init__( + self, i: int, t: float, p: int = None, c: ObjectCommunicator = None, user_defined: dict = None, subphases: list = None): # Object index if not isinstance(i, int) or isinstance(i, bool): sys.excepthook = exc_handler - raise TypeError(f"i: {i} is type of {type(i)}! Must be !") + raise TypeError(f"i: {i} is of type {type(i)} but must be ") else: self.__index = i - # Time required to perform the work of this object + # Load required to perform the work of this object if not isinstance(t, float): sys.excepthook = exc_handler - raise TypeError(f"t: {t} is type of {type(t)}! Must be !") + raise TypeError(f"t: {t} is of type {type(t)} but must be ") else: - self.__time = t + self.__load = t # Rank to which object is currently assigned if defined if bool(isinstance(p, int) or p is None) and not isinstance(p, bool): self.__rank_id = p else: sys.excepthook = exc_handler - raise TypeError(f"p: {p} is type of {type(p)}! Must be !") + raise TypeError(f"p: {p} is of type {type(p)} Must be ") # Communication graph of this object if defined if isinstance(c, ObjectCommunicator) or c is None: self.__communicator = c else: sys.excepthook = exc_handler - raise TypeError(f"c: {c} is type of {type(c)}! Must be !") + raise TypeError(f"c: {c} is of type {type(c)} Must be ") # User defined fields if isinstance(user_defined, dict) or user_defined is None: self.__user_defined = user_defined else: sys.excepthook = exc_handler - raise TypeError(f"user_defined: {user_defined} is type of {type(user_defined)}! Must be !") + raise TypeError(f"user_defined: {user_defined} is of type {type(user_defined)} but must be ") # Sub-phases if isinstance(subphases, list) or subphases is None: self.__subphases = subphases else: sys.excepthook = exc_handler - raise TypeError(f"subphases: {subphases} is type of {type(subphases)}! Must be !") + raise TypeError(f"subphases: {subphases} is of type {type(subphases)} but must be ") def __repr__(self): - return f"Object id: {self.__index}, time: {self.__time}" + return f"Object id: {self.__index}, load: {self.__load}" def get_id(self) -> int: """ Return object ID """ return self.__index - def get_time(self) -> float: - """ Return object time + def get_load(self) -> float: + """ Return object load """ - return self.__time + return self.__load def get_sent(self) -> dict: """ Return communications sent by object to other objects diff --git a/src/lbaf/Model/lbsPhase.py b/src/lbaf/Model/lbsPhase.py index f73b4ffb5..997bafe27 100644 --- a/src/lbaf/Model/lbsPhase.py +++ b/src/lbaf/Model/lbsPhase.py @@ -141,16 +141,16 @@ def invalidate_edge_cache(self): def populate_from_samplers(self, n_ranks, n_objects, t_sampler, v_sampler, c_degree, n_r_mapped=0): """ Use samplers to populate either all or n ranks in a phase.""" - # Retrieve desired time sampler with its theoretical average - time_sampler, sampler_name = sampler(t_sampler.get("name"), t_sampler.get("parameters"), self.__logger) + # Retrieve desired load sampler with its theoretical average + load_sampler, sampler_name = sampler(t_sampler.get("name"), t_sampler.get("parameters"), self.__logger) - # Create n_objects objects with uniformly distributed times in given range - self.__logger.info(f"Creating {n_objects} objects with times sampled from {sampler_name}") + # Create n_objects objects with uniformly distributed loads in given range + self.__logger.info(f"Creating {n_objects} objects with loads sampled from {sampler_name}") self.__number_of_objects = n_objects - objects = set([Object(i, time_sampler()) for i in range(n_objects)]) + objects = set([Object(i, load_sampler()) for i in range(n_objects)]) - # Compute and report object time statistics - print_function_statistics(objects, lambda x: x.get_time(), "object times", self.__logger) + # Compute and report object load statistics + print_function_statistics(objects, lambda x: x.get_load(), "object loads", self.__logger) # Decide whether communications must be created if c_degree > 0: @@ -247,8 +247,8 @@ def populate_from_log(self, t_s, basename): objects = set() for p in self.__ranks: objects = objects.union(p.get_objects()) - print_function_statistics(objects, lambda x: x.get_time(), "object times", self.__logger) + print_function_statistics(objects, lambda x: x.get_load(), "object loads", self.__logger) # Set number of read objects self.__n_objects = len(objects) - self.__logger.info(f"Read {self.__n_objects} objects from time-step {t_s} of data files with prefix {basename}") + self.__logger.info(f"Read {self.__n_objects} objects from load-step {t_s} of data files with prefix {basename}") diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index 4fc899e9a..ea197c0b5 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -99,7 +99,7 @@ def remove_migratable_object(self, o: Object, p_dst: "Rank"): # Update known loads when these exist if self.__known_loads: - self.__known_loads[p_dst] += o.get_time() + self.__known_loads[p_dst] += o.get_load() def add_as_viewer(self, ranks): """ Add self as viewer to known peers.""" @@ -109,15 +109,15 @@ def add_as_viewer(self, ranks): def get_load(self) -> float: """ Return total load on rank.""" - return sum([o.get_time() for o in self.__migratable_objects.union(self.__sentinel_objects)]) + return sum([o.get_load() for o in self.__migratable_objects.union(self.__sentinel_objects)]) def get_migratable_load(self) -> float: """ Return migratable load on rank.""" - return sum([o.get_time() for o in self.__migratable_objects]) + return sum([o.get_load() for o in self.__migratable_objects]) def get_sentinel_load(self) -> float: """ Return sentinel load oon rank.""" - return sum([o.get_time() for o in self.__sentinel_objects]) + return sum([o.get_load() for o in self.__sentinel_objects]) def get_received_volume(self): """ Return volume received by objects assigned to rank from other ranks."""