Skip to content

Commit

Permalink
#279: completed refactoring of time -> load except for JSON files keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Oct 23, 2022
1 parent e35d1dd commit 9d069df
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def report_final_mapping(self, logger):
def execute(self, phases, distributions, statistics, a_min_max):
""" Excecute balancing algorithm on Phase instance
phases: list of Phase instances
distributions: dictionary of time-varying variables
distributions: dictionary of load-varying variables
statistics: dictionary of statistics
a_min_max: possibly empty list of optimal arrangements"""

Expand Down
6 changes: 3 additions & 3 deletions src/lbaf/Execution/lbsBruteForceAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def compute_arrangement_works(self, objects: tuple, arrangement: tuple) -> dict:
for rank, rank_object_ids in ranks.items():
# Compute load component for current rank
values = {
"load": sum([objects[i].get("time") for i in rank_object_ids])}
"load": sum([objects[i].get("load") for i in rank_object_ids])}

# Compute received communication volume
v = 0.0
Expand Down Expand Up @@ -88,7 +88,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _):
entry = {
"id": o.get_id(),
"rank": rank,
"time": o.get_time(),
"load": o.get_load(),
"to": {},
"from": {}}
comm = o.get_communicator()
Expand Down Expand Up @@ -153,7 +153,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, _):
object_id = objects[i]["id"]
for o in p_src.get_objects():
if o.get_id() == object_id:
self.__logger.debug(f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()}")
self.__logger.debug(f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()}")
p_src.remove_migratable_object(o, p_dst)
p_dst.add_migratable_object(o)
o.set_rank_id(p_dst.get_id())
Expand Down
32 changes: 16 additions & 16 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def __init__(self, work_model, parameters: dict, lgr: Logger):
self.__strategy_mapped = {
"arbitrary": self.arbitrary,
"element_id": self.element_id,
"decreasing_times": self.decreasing_times,
"increasing_times": self.increasing_times,
"decreasing_loads": self.decreasing_loads,
"increasing_loads": self.increasing_loads,
"increasing_connectivity": self.increasing_connectivity,
"fewest_migrations": self.fewest_migrations,
"small_objects": self.small_objects}
Expand Down Expand Up @@ -280,7 +280,7 @@ def transfer_stage(self):
self.__logger.debug(f"Transferring {len(object_list)} object(s) at once")
for o in object_list:
self.__logger.debug(
f"transferring object {o.get_id()} ({o.get_time()}) to rank {p_dst.get_id()} "
f"transferring object {o.get_id()} ({o.get_load()}) to rank {p_dst.get_id()} "
f"(criterion: {c_dst})")
p_src.remove_migratable_object(o, p_dst)
p_dst.add_migratable_object(o)
Expand Down Expand Up @@ -367,14 +367,14 @@ def element_id(objects: set, _):
return sorted(objects, key=lambda x: x.get_id())

@staticmethod
def decreasing_times(objects: set, _):
""" Order objects by decreasing object times."""
return sorted(objects, key=lambda x: -x.get_time())
def decreasing_loads(objects: set, _):
""" Order objects by decreasing object loads."""
return sorted(objects, key=lambda x: -x.get_load())

@staticmethod
def increasing_times(objects: set, _):
""" Order objects by increasing object times."""
return sorted(objects, key=lambda x: x.get_time())
def increasing_loads(objects: set, _):
""" Order objects by increasing object loads."""
return sorted(objects, key=lambda x: x.get_load())

@staticmethod
def increasing_connectivity(objects: set, src_id):
Expand All @@ -399,14 +399,14 @@ def increasing_connectivity(objects: set, src_id):

@staticmethod
def sorted_ascending(objects: Union[set, list]):
return sorted(objects, key=lambda x: x.get_time())
return sorted(objects, key=lambda x: x.get_load())

@staticmethod
def sorted_descending(objects: Union[set, list]):
return sorted(objects, key=lambda x: -x.get_time())
return sorted(objects, key=lambda x: -x.get_load())

def load_excess(self, objects: set):
rank_load = sum([obj.get_time() for obj in objects])
rank_load = sum([obj.get_load() for obj in objects])
return rank_load - self.__average_load

def fewest_migrations(self, objects: set, _):
Expand All @@ -415,8 +415,8 @@ def fewest_migrations(self, objects: set, _):
Sort largest to the smallest if <= load_excess
Sort smallest to the largest if > load_excess"""
load_excess = self.load_excess(objects)
lt_load_excess = [obj for obj in objects if obj.get_time() <= load_excess]
get_load_excess = [obj for obj in objects if obj.get_time() > load_excess]
lt_load_excess = [obj for obj in objects if obj.get_load() <= load_excess]
get_load_excess = [obj for obj in objects if obj.get_load() > load_excess]
return self.sorted_descending(lt_load_excess) + self.sorted_ascending(get_load_excess)

def small_objects(self, objects: set, _):
Expand All @@ -426,6 +426,6 @@ def small_objects(self, objects: set, _):
Sort smallest to the largest if > load_excess"""
load_excess = self.load_excess(objects)
sorted_objects = self.sorted_ascending(objects)
accumulated_times = list(accumulate(obj.get_time() for obj in sorted_objects))
idx = bisect(accumulated_times, load_excess) + 1
accumulated_loads = list(accumulate(obj.get_load() for obj in sorted_objects))
idx = bisect(accumulated_loads, load_excess) + 1
return self.sorted_descending(sorted_objects[:idx]) + self.sorted_ascending(sorted_objects[idx:])
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsTemperedCriterion.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def compute(self, objects: list, p_src: Rank, p_dst: Rank) -> float:
w_max_0 = max(w_src_0, w_dst_0)

# Update loads in proposed new arrangement
object_loads = sum([o.get_time() for o in objects])
object_loads = sum([o.get_load() for o in objects])
values_src["load"] -= object_loads
values_dst["load"] += object_loads

Expand Down
16 changes: 8 additions & 8 deletions src/lbaf/IO/configurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
ALLOWED_STRATEGIES = (
"arbitrary",
"element_id",
"increasing_times",
"decreasing_times",
"increasing_loads",
"decreasing_loads",
"increasing_connectivity",
"fewest_migrations",
"small_objects")
Expand All @@ -22,7 +22,7 @@
"PhaseStepper")
ALLOWED_CRITERIA = ("Tempered", "StrictLocalizer")
ALLOWED_LOGGING_LEVELS = ("info", "debug", "warning", "error")
ALLOWED_TIME_VOLUME_SAMPLER = ("uniform", "lognormal")
ALLOWED_LOAD_VOLUME_SAMPLER = ("uniform", "lognormal")
ALLOWED_TERMINAL_BACKGROUND = ("light", "dark")


Expand Down Expand Up @@ -102,12 +102,12 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"n_mapped_ranks": And(int, lambda x: x >= 0,
error="Should be of type 'int' and >= 0"),
"communication_degree": int,
"time_sampler": {
"load_sampler": {
"name": And(
str,
Use(str.lower),
lambda a: a in ALLOWED_TIME_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"),
lambda a: a in ALLOWED_LOAD_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"),
"parameters": And(
[float],
lambda s: len(s) == 2,
Expand All @@ -116,8 +116,8 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"name": And(
str,
Use(str.lower),
lambda b: b in ALLOWED_TIME_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_TIME_VOLUME_SAMPLER)} must be chosen"),
lambda b: b in ALLOWED_LOAD_VOLUME_SAMPLER,
error=f"{get_error_message(ALLOWED_LOAD_VOLUME_SAMPLER)} must be chosen"),
"parameters": And(
[float],
lambda s: len(s) == 2,
Expand Down
38 changes: 19 additions & 19 deletions src/lbaf/IO/lbsMeshBasedVisualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ def __init__(self, logger: Logger, phases: list, grid_size: list, object_jitter=
# Initialize maximum edge volume
self.__max_volume = 0.0

# Compute object time range
self.__time_range = [math.inf, 0.0]
# Compute object load range
self.__load_range = [math.inf, 0.0]
for p in self.__phases:
for r in p.get_ranks():
for o in r.get_objects():
# Update time range when necessary
time = o.get_time()
if time > self.__time_range[1]:
self.__time_range[1] = time
if time < self.__time_range[0]:
self.__time_range[0] = time
# Update load range when necessary
load = o.get_load()
if load > self.__load_range[1]:
self.__load_range[1] = load
if load < self.__load_range[0]:
self.__load_range[0] = load

# Assemble file and path names from constructor parameters
self.__rank_file_name = f"{output_file_stem}_rank_view.e"
Expand Down Expand Up @@ -215,7 +215,7 @@ def create_object_mesh(self, phase: Phase, object_mapping: set):
self.__logger.info(
f"Creating object mesh with {n_o} points and {n_e} edges")

# Create point array for object times
# Create point array for object loads
t_arr = vtk.vtkDoubleArray()
t_arr.SetName("Load")
t_arr.SetNumberOfTuples(n_o)
Expand Down Expand Up @@ -275,8 +275,8 @@ def create_object_mesh(self, phase: Phase, object_mapping: set):
(0.0, 0.0, 0.0))[d] + c) * o_resolution
for d, c in enumerate(self.global_id_to_cartesian(
i, rank_size))])
time = o.get_time()
t_arr.SetTuple1(point_index, time)
load = o.get_load()
t_arr.SetTuple1(point_index, load)
b_arr.SetTuple1(point_index, m)

# Update sent volumes
Expand Down Expand Up @@ -461,7 +461,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
renderer.AddActor(edge_actor)
renderer.AddActor2D(volume_actor)

# Compute square root of object times
# Compute square root of object loads
sqrtT = vtk.vtkArrayCalculator()
sqrtT.SetInputData(object_mesh)
sqrtT.AddScalarArrayName("Load")
Expand All @@ -486,7 +486,7 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
thresh_out.GetPointData().SetActiveScalars(
sqrtT_str)

# Glyph by square root of object times
# Glyph by square root of object loads
glyph = vtk.vtkGlyphSource2D()
getattr(glyph, f"SetGlyphTypeTo{v}")()
glyph.SetResolution(32)
Expand All @@ -513,16 +513,16 @@ def create_rendering_pipeline(self, iteration: int, pid: int, edge_width: int, g
glyph_mapper.SetInputConnection(trans.GetOutputPort())
glyph_mapper.SetLookupTable(
self.create_color_transfer_function(
self.__time_range, "blue_to_red"))
glyph_mapper.SetScalarRange(self.__time_range)
self.__load_range, "blue_to_red"))
glyph_mapper.SetScalarRange(self.__load_range)
glyph_actor = vtk.vtkActor()
glyph_actor.SetMapper(glyph_mapper)
renderer.AddActor(glyph_actor)

# Create and add unique scalar bar for object time
time_actor = self.create_scalar_bar_actor(
# Create and add unique scalar bar for object load
load_actor = self.create_scalar_bar_actor(
glyph_mapper, "Object Load", 0.55, 0.05)
renderer.AddActor2D(time_actor)
renderer.AddActor2D(load_actor)

# Create text actor to indicate iteration
text_actor = vtk.vtkTextActor()
Expand Down Expand Up @@ -615,7 +615,7 @@ def generate(self, gen_meshes, gen_mulmed):
f"\tcommunication edges width: {edge_width:.2g}")
glyph_factor = self.__grid_resolution / (
(self.__max_o_per_dim + 1)
* math.sqrt(self.__time_range[1]))
* math.sqrt(self.__load_range[1]))
self.__logger.info(
f"\tobject glyphs scaling: {glyph_factor:.2g}")

Expand Down
6 changes: 3 additions & 3 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def json_reader(self, returned_dict: dict, phase_ids, node_id: int) -> tuple:

# Iterate over tasks
for task in phase["tasks"]:
task_time = task.get("time")
task_load = task.get("time")
entity = task.get("entity")
task_object_id = entity.get("id")
task_used_defined = task.get("user_defined")
Expand All @@ -225,7 +225,7 @@ def json_reader(self, returned_dict: dict, phase_ids, node_id: int) -> tuple:
# Update rank if iteration was requested
if phase_ids in (phase_id, -1):
# Instantiate object with retrieved parameters
obj = Object(task_object_id, task_time, node_id, user_defined=task_used_defined,
obj = Object(task_object_id, task_load, node_id, user_defined=task_used_defined,
subphases=subphases)
# If this iteration was never encountered initialize rank object
returned_dict.setdefault(phase_id, Rank(node_id, logger=self.__logger))
Expand All @@ -236,6 +236,6 @@ def json_reader(self, returned_dict: dict, phase_ids, node_id: int) -> tuple:
returned_dict[phase_id].add_sentinel_object(obj)

# Print debug information when requested
self.__logger.debug(f"Added object {task_object_id}, time = {task_time} to phase {phase_id}")
self.__logger.debug(f"Added object {task_object_id}, load = {task_load} to phase {phase_id}")

return returned_dict, comm_dict
8 changes: 4 additions & 4 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ def json_writer(self, rank: Rank) -> str:
for o in rank.get_objects():
# Write object to file and increment count
try:
# writer.writerow([o.get_rank_id(), o.get_id(), o.get_time()])
# writer.writerow([o.get_rank_id(), o.get_id(), o.get_load()])
rank_id = o.get_rank_id()
obj_id = o.get_id()
obj_time = o.get_time()
obj_load = o.get_load()
if isinstance(temp_dict.get(rank_id, None), list):
temp_dict[rank_id].append({
"rank_id": rank_id,
"obj_id": obj_id,
"obj_time": obj_time})
"obj_load": obj_load})
else:
temp_dict[rank_id] = list()
temp_dict[rank_id].append({
"rank_id": rank_id,
"obj_id": obj_id,
"obj_time": obj_time})
"obj_load": obj_load})
except:
n_u += 1

Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/Model/lbsLoadOnlyWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, _, lgr: Logger):
self.__logger.info("Instantiated concrete work model")

def compute(self, rank: Rank):
""" A work model summing all object times on given rank
""" A work model summing all object loads on given rank
"""
# Return total load on this rank
return rank.get_load()
Expand Down
Loading

0 comments on commit 9d069df

Please sign in to comment.