Skip to content

Commit

Permalink
#279: completed refactoring of time -> load except for JSON files keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Nov 17, 2022
1 parent 6429124 commit 5ab79cd
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def report_final_mapping(self, logger):
def execute(self, phases, distributions, statistics, a_min_max):
""" Excecute balancing algorithm on Phase instance
phases: list of Phase instances
distributions: dictionary of time-varying variables
distributions: dictionary of load-varying variables
statistics: dictionary of statistics
a_min_max: possibly empty list of optimal arrangements"""

Expand Down
6 changes: 3 additions & 3 deletions src/lbaf/Execution/lbsBruteForceAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def compute_arrangement_works(self, objects: tuple, arrangement: tuple) -> dict:
for rank, rank_object_ids in ranks.items():
# Compute load component for current rank
values = {
"load": sum([objects[i].get("time") for i in rank_object_ids])}
"load": sum([objects[i].get("load") for i in rank_object_ids])}

# Compute received communication volume
v = 0.0
Expand Down Expand Up @@ -88,7 +88,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _):
entry = {
"id": o.get_id(),
"rank": rank,
"time": o.get_time(),
"load": o.get_load(),
"to": {},
"from": {}}
comm = o.get_communicator()
Expand Down Expand Up @@ -153,7 +153,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _):
object_id = objects[i]["id"]
for o in p_src.get_objects():
if o.get_id() == object_id:
self.__logger.debug(f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()}")
self.__logger.debug(f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()}")
p_src.remove_migratable_object(o, p_dst)
p_dst.add_migratable_object(o)
o.set_rank_id(p_dst.get_id())
Expand Down
32 changes: 16 additions & 16 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def __init__(self, work_model, parameters: dict, lgr: Logger):
self.__strategy_mapped = {
"arbitrary": self.arbitrary,
"element_id": self.element_id,
"decreasing_times": self.decreasing_times,
"increasing_times": self.increasing_times,
"decreasing_loads": self.decreasing_loads,
"increasing_loads": self.increasing_loads,
"increasing_connectivity": self.increasing_connectivity,
"fewest_migrations": self.fewest_migrations,
"small_objects": self.small_objects}
Expand Down Expand Up @@ -280,7 +280,7 @@ def transfer_stage(self):
self.__logger.debug(f"Transferring {len(object_list)} object(s) at once")
for o in object_list:
self.__logger.debug(
f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()} "
f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()} "
f"(criterion: {c_dst})")
p_src.remove_migratable_object(o, p_dst)
p_dst.add_migratable_object(o)
Expand Down Expand Up @@ -367,14 +367,14 @@ def element_id(objects: set, _):
return sorted(objects, key=lambda x: x.get_id())

@staticmethod
def decreasing_times(objects: set, _):
""" Order objects by decreasing object times."""
return sorted(objects, key=lambda x: -x.get_time())
def decreasing_loads(objects: set, _):
""" Order objects by decreasing object loads."""
return sorted(objects, key=lambda x: -x.get_load())

@staticmethod
def increasing_times(objects: set, _):
""" Order objects by increasing object times."""
return sorted(objects, key=lambda x: x.get_time())
def increasing_loads(objects: set, _):
""" Order objects by increasing object loads."""
return sorted(objects, key=lambda x: x.get_load())

@staticmethod
def increasing_connectivity(objects: set, src_id):
Expand All @@ -399,14 +399,14 @@ def increasing_connectivity(objects: set, src_id):

@staticmethod
def sorted_ascending(objects: Union[set, list]):
return sorted(objects, key=lambda x: x.get_time())
return sorted(objects, key=lambda x: x.get_load())

@staticmethod
def sorted_descending(objects: Union[set, list]):
return sorted(objects, key=lambda x: -x.get_time())
return sorted(objects, key=lambda x: -x.get_load())

def load_excess(self, objects: set):
rank_load = sum([obj.get_time() for obj in objects])
rank_load = sum([obj.get_load() for obj in objects])
return rank_load - self.__average_load

def fewest_migrations(self, objects: set, _):
Expand All @@ -415,8 +415,8 @@ def fewest_migrations(self, objects: set, _):
Sort largest to the smallest if <= load_excess
Sort smallest to the largest if > load_excess"""
load_excess = self.load_excess(objects)
lt_load_excess = [obj for obj in objects if obj.get_time() <= load_excess]
get_load_excess = [obj for obj in objects if obj.get_time() > load_excess]
lt_load_excess = [obj for obj in objects if obj.get_load() <= load_excess]
get_load_excess = [obj for obj in objects if obj.get_load() > load_excess]
return self.sorted_descending(lt_load_excess) + self.sorted_ascending(get_load_excess)

def small_objects(self, objects: set, _):
Expand All @@ -426,6 +426,6 @@ def small_objects(self, objects: set, _):
Sort smallest to the largest if > load_excess"""
load_excess = self.load_excess(objects)
sorted_objects = self.sorted_ascending(objects)
accumulated_times = list(accumulate(obj.get_time() for obj in sorted_objects))
idx = bisect(accumulated_times, load_excess) + 1
accumulated_loads = list(accumulate(obj.get_load() for obj in sorted_objects))
idx = bisect(accumulated_loads, load_excess) + 1
return self.sorted_descending(sorted_objects[:idx]) + self.sorted_ascending(sorted_objects[idx:])
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsTemperedCriterion.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def compute(self, objects: list, p_src: Rank, p_dst: Rank) -> float:
w_max_0 = max(w_src_0, w_dst_0)

# Update loads in proposed new arrangement
object_loads = sum([o.get_time() for o in objects])
object_loads = sum([o.get_load() for o in objects])
values_src["load"] -= object_loads
values_dst["load"] += object_loads

Expand Down
16 changes: 8 additions & 8 deletions src/lbaf/IO/configurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
ALLOWED_STRATEGIES = (
"arbitrary",
"element_id",
"increasing_times",
"decreasing_times",
"increasing_loads",
"decreasing_loads",
"increasing_connectivity",
"fewest_migrations",
"small_objects")
Expand All @@ -22,7 +22,7 @@
"PhaseStepper")
ALLOWED_CRITERIA = ("Tempered", "StrictLocalizer")
ALLOWED_LOGGING_LEVELS = ("info", "debug", "warning", "error")
ALLOWED_TIME_VOLUME_SAMPLER = ("uniform", "lognormal")
ALLOWED_LOAD_VOLUME_SAMPLER = ("uniform", "lognormal")
ALLOWED_TERMINAL_BACKGROUND = ("light", "dark")


Expand Down Expand Up @@ -102,12 +102,12 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"n_mapped_ranks": And(int, lambda x: x >= 0,
error="Should be of type 'int' and >= 0"),
"communication_degree": int,
"time_sampler": {
"load_sampler": {
"name": And(
str,
Use(str.lower),
lambda a: a in ALLOWED_TIME_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"),
lambda a: a in ALLOWED_LOAD_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"),
"parameters": And(
[float],
lambda s: len(s) == 2,
Expand All @@ -116,8 +116,8 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"name": And(
str,
Use(str.lower),
lambda b: b in ALLOWED_TIME_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"),
lambda b: b in ALLOWED_LOAD_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"),
"parameters": And(
[float],
lambda s: len(s) == 2,
Expand Down
38 changes: 19 additions & 19 deletions src/lbaf/IO/lbsMeshBasedVisualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ def __init__(self, logger: Logger, phases: list, grid_size: list, object_jitter=
# Initialize maximum edge volume
self.__max_volume = 0.0

# Compute object time range
self.__time_range = [math.inf, 0.0]
# Compute object load range
self.__load_range = [math.inf, 0.0]
for p in self.__phases:
for r in p.get_ranks():
for o in r.get_objects():
# Update time range when necessary
time = o.get_time()
if time > self.__time_range[1]:
self.__time_range[1] = time
if time < self.__time_range[0]:
self.__time_range[0] = time
# Update load range when necessary
load = o.get_load()
if load > self.__load_range[1]:
self.__load_range[1] = load
if load < self.__load_range[0]:
self.__load_range[0] = load

# Assemble file and path names from constructor parameters
self.__rank_file_name = f"{output_file_stem}_rank_view.e"
Expand Down Expand Up @@ -215,7 +215,7 @@ def create_object_mesh(self, phase: Phase, object_mapping: set):
self.__logger.info(
f"Creating object mesh with {n_o} points and {n_e} edges")

# Create point array for object times
# Create point array for object loads
t_arr = vtk.vtkDoubleArray()
t_arr.SetName("Load")
t_arr.SetNumberOfTuples(n_o)
Expand Down Expand Up @@ -275,8 +275,8 @@ def create_object_mesh(self, phase: Phase, object_mapping: set):
(0.0, 0.0, 0.0))[d] + c) * o_resolution
for d, c in enumerate(self.global_id_to_cartesian(
i, rank_size))])
time = o.get_time()
t_arr.SetTuple1(point_index, time)
load = o.get_load()
t_arr.SetTuple1(point_index, load)
b_arr.SetTuple1(point_index, m)

# Update sent volumes
Expand Down Expand Up @@ -461,7 +461,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
renderer.AddActor(edge_actor)
renderer.AddActor2D(volume_actor)

# Compute square root of object times
# Compute square root of object loads
sqrtT = vtk.vtkArrayCalculator()
sqrtT.SetInputData(object_mesh)
sqrtT.AddScalarArrayName("Load")
Expand All @@ -486,7 +486,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
thresh_out.GetPointData().SetActiveScalars(
sqrtT_str)

# Glyph by square root of object times
# Glyph by square root of object loads
glyph = vtk.vtkGlyphSource2D()
getattr(glyph, f"SetGlyphTypeTo{v}")()
glyph.SetResolution(32)
Expand All @@ -513,16 +513,16 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
glyph_mapper.SetInputConnection(trans.GetOutputPort())
glyph_mapper.SetLookupTable(
self.create_color_transfer_function(
self.__time_range, "blue_to_red"))
glyph_mapper.SetScalarRange(self.__time_range)
self.__load_range, "blue_to_red"))
glyph_mapper.SetScalarRange(self.__load_range)
glyph_actor = vtk.vtkActor()
glyph_actor.SetMapper(glyph_mapper)
renderer.AddActor(glyph_actor)

# Create and add unique scalar bar for object time
time_actor = self.create_scalar_bar_actor(
# Create and add unique scalar bar for object load
load_actor = self.create_scalar_bar_actor(
glyph_mapper, "Object Load", 0.55, 0.05)
renderer.AddActor2D(time_actor)
renderer.AddActor2D(load_actor)

# Create text actor to indicate iteration
text_actor = vtk.vtkTextActor()
Expand Down Expand Up @@ -615,7 +615,7 @@ def generate(self, gen_meshes, gen_mulmed):
f"\tcommunication edges width: {edge_width:.2g}")
glyph_factor = self.__grid_resolution / (
(self.__max_o_per_dim + 1)
* math.sqrt(self.__time_range[1]))
* math.sqrt(self.__load_range[1]))
self.__logger.info(
f"\tobject glyphs scaling: {glyph_factor:.2g}")

Expand Down
4 changes: 2 additions & 2 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ def json_reader(self, returned_dict: dict, phase_id: int, node_id: int) -> tuple
phase_rank := Rank(node_id, logger=self.__logger))

# Iterate over tasks
for task in p["tasks"]:
task_time = task.get("time")
for task in phase["tasks"]:
task_load = task.get("time")
entity = task.get("entity")
task_object_id = entity.get("id")
task_used_defined = task.get("user_defined")
Expand Down
8 changes: 4 additions & 4 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,21 @@ def json_writer(self, rank: Rank) -> str:
for o in rank.get_objects():
# Write object to file and increment count
try:
# writer.writerow([o.get_rank_id(), o.get_id(), o.get_time()])
# writer.writerow([o.get_rank_id(), o.get_id(), o.get_load()])
rank_id = o.get_rank_id()
obj_id = o.get_id()
obj_time = o.get_time()
obj_load = o.get_load()
if isinstance(temp_dict.get(rank_id, None), list):
temp_dict[rank_id].append({
"rank_id": rank_id,
"obj_id": obj_id,
"obj_time": obj_time})
"obj_load": obj_load})
else:
temp_dict[rank_id] = list()
temp_dict[rank_id].append({
"rank_id": rank_id,
"obj_id": obj_id,
"obj_time": obj_time})
"obj_load": obj_load})
except:
n_u += 1

Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/Model/lbsLoadOnlyWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, _, lgr: Logger):
self.__logger.info("Instantiated concrete work model")

def compute(self, rank: Rank):
""" A work model summing all object times on given rank
""" A work model summing all object loads on given rank
"""
# Return total load on this rank
return rank.get_load()
Expand Down
30 changes: 15 additions & 15 deletions src/lbaf/Model/lbsObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,64 @@


class Object:
""" A class representing an object with time and communicator
""" A class representing an object with load and communicator
"""
def __init__(self, i: int, t: float, p: int = None, c: ObjectCommunicator = None, user_defined: dict = None,
subphases: list = None):
def __init__(
self, i: int, t: float, p: int = None, c: ObjectCommunicator = None, user_defined: dict = None, subphases: list = None):
# Object index
if not isinstance(i, int) or isinstance(i, bool):
sys.excepthook = exc_handler
raise TypeError(f"i: {i} is type of {type(i)}! Must be <class 'int'>!")
raise TypeError(f"i: {i} is of type {type(i)} but must be <class 'int'>")
else:
self.__index = i

# Time required to perform the work of this object
# Load required to perform the work of this object
if not isinstance(t, float):
sys.excepthook = exc_handler
raise TypeError(f"t: {t} is type of {type(t)}! Must be <class 'float'>!")
raise TypeError(f"t: {t} is of type {type(t)} but must be <class 'float'>")
else:
self.__time = t
self.__load = t

# Rank to which object is currently assigned if defined
if bool(isinstance(p, int) or p is None) and not isinstance(p, bool):
self.__rank_id = p
else:
sys.excepthook = exc_handler
raise TypeError(f"p: {p} is type of {type(p)}! Must be <class 'int'>!")
raise TypeError(f"p: {p} is of type {type(p)} Must be <class 'int'>")

# Communication graph of this object if defined
if isinstance(c, ObjectCommunicator) or c is None:
self.__communicator = c
else:
sys.excepthook = exc_handler
raise TypeError(f"c: {c} is type of {type(c)}! Must be <class 'ObjectCommunicator'>!")
raise TypeError(f"c: {c} is of type {type(c)} Must be <class 'ObjectCommunicator'>")

# User defined fields
if isinstance(user_defined, dict) or user_defined is None:
self.__user_defined = user_defined
else:
sys.excepthook = exc_handler
raise TypeError(f"user_defined: {user_defined} is type of {type(user_defined)}! Must be <class 'dict'>!")
raise TypeError(f"user_defined: {user_defined} is of type {type(user_defined)} but must be <class 'dict'>")

# Sub-phases
if isinstance(subphases, list) or subphases is None:
self.__subphases = subphases
else:
sys.excepthook = exc_handler
raise TypeError(f"subphases: {subphases} is type of {type(subphases)}! Must be <class 'list'>!")
raise TypeError(f"subphases: {subphases} is of type {type(subphases)} but must be <class 'list'>")

def __repr__(self):
return f"Object id: {self.__index}, time: {self.__time}"
return f"Object id: {self.__index}, load: {self.__load}"

def get_id(self) -> int:
""" Return object ID
"""
return self.__index

def get_time(self) -> float:
""" Return object time
def get_load(self) -> float:
""" Return object load
"""
return self.__time
return self.__load

def get_sent(self) -> dict:
""" Return communications sent by object to other objects
Expand Down
Loading

0 comments on commit 5ab79cd

Please sign in to comment.