Skip to content

Commit

Permalink
#279: shared level memory
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Nov 17, 2022
1 parent bb08da9 commit 1176963
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
15 changes: 15 additions & 0 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ def main(self):
lambda x: x.get_max_object_level_memory(),
"initial rank object-level memory",
self.logger)
lbstats.print_function_statistics(
phase_0.get_ranks(),
lambda x: x.get_size(),
"initial rank working memory",
self.logger)
lbstats.print_function_statistics(
phase_0.get_ranks(),
lambda x: x.get_shared(),
"initial rank shared memory",
self.logger)
lbstats.print_function_statistics(
phase_0.get_edges().values(),
lambda x: x,
Expand Down Expand Up @@ -371,6 +381,11 @@ def main(self):
lambda x: x.get_max_object_level_memory(),
"final rank object-level memory",
self.logger)
lbstats.print_function_statistics(
phase_0.get_ranks(),
lambda x: x.get_size(),
"final rank working memory",
self.logger)
lbstats.print_function_statistics(
phase_0.get_edges().values(),
lambda x: x,
Expand Down
22 changes: 17 additions & 5 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ def read(self, node_id: int, phase_id: int = -1, comm: bool = False) -> tuple:
"""

# Retrieve communications from JSON reader
iter_map = {}
iter_map, comm = self.json_reader(
returned_dict=iter_map,
iter_dict = {}
iter_dict, comm = self.json_reader(
returned_dict=iter_dict,
phase_id=phase_id,
node_id=node_id)

# Return map of populated ranks per iteration
return iter_map, comm
return iter_dict, comm

def read_iteration(self, phase_id: int) -> list:
""" Read all the data in the range of ranks [0..n_p] for a given iteration `phase_id`.
Expand Down Expand Up @@ -234,7 +234,13 @@ def json_reader(self, returned_dict: dict, phase_id: int, node_id: int) -> tuple

# Instantiante and store rank for current phase
returned_dict[curr_phase_id] = (
phase_rank := Rank(node_id, logger=self.__logger))
phase_rank := Rank(
node_id,
logger=self.__logger,
size=task_user_defined.get("rank_working_bytes")))

# Initialize storage for shared blocks information
shared_blocks = {}

# Iterate over tasks
for task in phase["tasks"]:
Expand All @@ -244,6 +250,11 @@ def json_reader(self, returned_dict: dict, phase_id: int, node_id: int) -> tuple
task_id = task_entity.get("id")
task_user_defined = task.get("user_defined")

# Update share block information as needed
if (shared_id := task_user_defined.get("shared_id", -1)) > -1:
shared_blocks[
shared_id] = task_user_defined.get("shared_bytes", 0.0)

# Instantiate object with retrieved parameters
obj = Object(
task_object_id,
Expand All @@ -262,5 +273,6 @@ def json_reader(self, returned_dict: dict, phase_id: int, node_id: int) -> tuple
self.__logger.debug(
f"Added object {task_object_id}, time = {task_time} to phase {curr_phase_id}")

print(phase_id, node_id, task_user_defined.get("rank_working_bytes"), sum(shared_blocks.values()))
# Returned dictionaries of rank/objects and communicators per phase
return returned_dict, comm_dict
2 changes: 1 addition & 1 deletion src/lbaf/Model/lbsObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
else:
self.__load = load

# Nonnegative size required to store this object
# Nonnegative size required to for memory footprint of this object
if not isinstance(size, float) or size < 0.0:
sys.excepthook = exc_handler
raise TypeError(
Expand Down
36 changes: 35 additions & 1 deletion src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ class Rank:
""" A class representing a rank to which objects are assigned
"""

def __init__(self, i: int, logger: Logger, mo: set = None, so: set = None):
def __init__(
self,
i: int,
logger: Logger,
mo: set = None,
so: set = None,
size: float=0.0,
shared: float=0.0):

# Assign logger to instance variable
self.__logger = logger

Expand All @@ -25,6 +33,22 @@ def __init__(self, i: int, logger: Logger, mo: set = None, so: set = None):
for o in so:
self.__sentinel_objects.add(o)

# Nonnegative size required to for memory footprint of this rank
if not isinstance(size, float) or size < 0.0:
sys.excepthook = exc_handler
raise TypeError(
f"size: incorrect type {type(size)} or value: {size}")
else:
self.__size = size

# Nonnegative size required to for shared memory of this rank
if not isinstance(shared, float) or shared < 0.0:
sys.excepthook = exc_handler
raise TypeError(
f"shared: incorrect type {type(shared)} or value: {shared}")
else:
self.__shared = shared

# No information about peers is known initially
self.__known_loads = {}

Expand All @@ -41,6 +65,16 @@ def get_id(self) -> int:
""" Return rank ID."""
return self.__index

def get_size(self) -> float:
""" Return object size
"""
return self.__size

def get_shared(self) -> float:
""" Return object shared memory
"""
return self.__shared

def get_objects(self) -> set:
""" Return all objects assigned to rank."""
return self.__migratable_objects.union(self.__sentinel_objects)
Expand Down

0 comments on commit 1176963

Please sign in to comment.